Python 은 Beanstalkd 를 사용 하여 비동기 임 무 를 처리 하 는 방법 입 니 다.
6998 단어 PythonBeanstalkd비동기
최종 효과
정의 작업:
from xxxxx.job_queue import JobQueue
queue = JobQueue()
@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
# do task
제출 퀘 스 트:
task_one.put(arg1="a", arg2="b", arg3="c")
그리고 백 스테이지 워 크 스 레 드 에서 이 임 무 를 수행 할 수 있 습 니 다.실현 과정
1.Beanstalk 서버 알 아 보기
Beanstalk is a simple, fast work queue. https://github.com/kr/beanstalkd
Beanstalk 는 C 언어 로 이 루어 진 메시지 큐 서비스 입 니 다.이 는 일반적인 인 터 페 이 스 를 제공 합 니 다.최초 로 디자인 된 목적 은 비동기 실행 에 걸 리 는 작업 을 통 해 대량의 웹 프로그램의 페이지 지연 을 줄 이 는 것 입 니 다.서로 다른 언어 에 대해 서로 다른 Beanstalkd Client 가 실 현 됩 니 다.Python 에 beanstalkc 등 이 있 습 니 다.저 는 beanstalkc 를 이용 하여 beanstalkd server 와 통신 하 는 도구 입 니 다.
2.임무 비동기 집행 실현 원리
beanstalkd 는 문자열 의 작업 스케줄 링 만 할 수 있 습 니 다.프로그램 이 제출 함수 와 파 라 메 터 를 지원 하도록 한 다음 에 woker 에서 함 수 를 실행 하고 파 라 메 터 를 가 져 옵 니 다.함수 와 전 달 된 매개 변 수 를 중간 층 으로 등록 해 야 합 니 다.
실현 은 주로 세 부분 을 포함한다.
Subscriber:함 수 를 beanstalk 의 한 튜브 에 등록 하여 간단 하고 등록 함수 이름과 함수 자체 의 대응 관 계 를 실현 합 니 다.(같은 그룹(tube)에 같은 함수 이름 이 존재 할 수 없다 는 뜻 이다.데 이 터 는 클래스 변수 에 저 장 됩 니 다.
class Subscriber(object):
FUN_MAP = defaultdict(dict)
def __init__(self, func, tube):
logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
Subscriber.FUN_MAP[tube][func.__name__] = func
JobQueue:일반 함 수 를 Putter 능력 을 가 진 장식 기로 쉽게 변환 할 수 있 습 니 다.
class JobQueue(object):
@classmethod
def task(cls, tube):
def wrapper(func):
Subscriber(func, tube)
return Putter(func, tube)
return wrapper
Putter:함수 명,함수 파라미터,지정 한 그룹 을 하나의 대상 으로 조합 한 다음 json 을 문자열 로 정렬 하고 마지막 으로 beanstalkc 를 통 해 beanstalkd 대기 열 로 보 냅 니 다.
class Putter(object):
def __init__(self, func, tube):
self.func = func
self.tube = tube
#
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
#
def put(self, **kwargs):
args = {
'func_name': self.func.__name__,
'tube': self.tube,
'kwargs': kwargs
}
logger.info('put job:{} to queue'.format(args))
beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
try:
beanstalk.use(self.tube)
job_id = beanstalk.put(json.dumps(args))
return job_id
finally:
beanstalk.close()
Worker:beanstalkd 대기 열 에서 문자열 을 꺼 낸 다음 json.loads 를 통 해 역 직렬 화 를 대상 으로 함수 명,파라미터,튜브 를 얻 습 니 다.마지막 으로 Subscriber 에서 함수 명 에 대응 하 는 함수 코드 를 얻 은 다음 매개 변수 실행 함 수 를 전달 합 니 다.
class Worker(object):
worker_id = 0
def __init__(self, tubes):
self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
self.tubes = tubes
self.reserve_timeout = 20
self.timeout_limit = 1000
self.kick_period = 600
self.signal_shutdown = False
self.release_delay = 0
self.age = 0
self.signal_shutdown = False
signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
Worker.worker_id += 1
import_module_by_str('pear.web.controllers.controller_crawler')
def subscribe(self):
if isinstance(self.tubes, list):
for tube in self.tubes:
if tube not in Subscriber.FUN_MAP.keys():
logger.error('tube:{} not register!'.format(tube))
continue
self.beanstalk.watch(tube)
else:
if self.tubes not in Subscriber.FUN_MAP.keys():
logger.error('tube:{} not register!'.format(self.tubes))
return
self.beanstalk.watch(self.tubes)
def run(self):
self.subscribe()
while True:
if self.signal_shutdown:
break
if self.signal_shutdown:
logger.info("graceful shutdown")
break
job = self.beanstalk.reserve(timeout=self.reserve_timeout) # , timeout
if not job:
continue
try:
self.on_job(job)
self.delete_job(job)
except beanstalkc.CommandFailed as e:
logger.warning(e, exc_info=1)
except Exception as e:
logger.error(e)
kicks = job.stats()['kicks']
if kicks < 3:
self.bury_job(job)
else:
message = json.loads(job.body)
logger.error("Kicks reach max. Delete the job", extra={'body': message})
self.delete_job(job)
@classmethod
def on_job(cls, job):
start = time.time()
msg = json.loads(job.body)
logger.info(msg)
tube = msg.get('tube')
func_name = msg.get('func_name')
try:
func = Subscriber.FUN_MAP[tube][func_name]
kwargs = msg.get('kwargs')
func(**kwargs)
logger.info(u'{}-{}'.format(func, kwargs))
except Exception as e:
logger.error(e.message, exc_info=True)
cost = time.time() - start
logger.info('{} cost {}s'.format(func_name, cost))
@classmethod
def delete_job(cls, job):
try:
job.delete()
except beanstalkc.CommandFailed as e:
logger.warning(e, exc_info=1)
@classmethod
def bury_job(cls, job):
try:
job.bury()
except beanstalkc.CommandFailed as e:
logger.warning(e, exc_info=1)
def graceful_shutdown(self):
self.signal_shutdown = True
위 코드 를 쓸 때 문제 가 하나 있 습 니 다.Subscriber 를 통 해 함수 이름과 함수 자체 의 대응 관 계 를 등록 합 니 다.Python 해석 기,즉 한 프로 세 스에 서 실 행 됩 니 다.Worker 는 다른 프로 세 스에 서 비동기 로 실 행 됩 니 다.어떻게 해 야 Worker 도 Putter 와 같은 Subscriber 를 얻 을 수 있 습 니까?마지막 으로 Python 의 장식 기 체 제 를 통 해 이 문 제 를 해결 할 수 있다 는 것 을 발견 했다.
바로 이 구절 입 니 다.Subscriber 의 문 제 를 해 결 했 습 니 다.
import_module_by_str('pear.web.controllers.controller_crawler')
# import_module_by_str
def import_module_by_str(module_name):
if isinstance(module_name, unicode):
module_name = str(module_name)
__import__(module_name)
import 실행module_by_str 시 호출import__ 동적 로드 클래스 와 함수.JobQueue 를 사용 한 함수 가 있 는 모듈 을 메모리 에 불 러 온 후.Woker 를 실행 할 때 Python 해석 기 는@수 식 된 인 테 리 어 코드 를 먼저 실행 하고 Subscriber 의 대응 관 계 를 메모리 에 불 러 옵 니 다.실제 사용 은 볼 수 있다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Python의 None과 NULL의 차이점 상세 정보그래서 대상 = 속성 + 방법 (사실 방법도 하나의 속성, 데이터 속성과 구별되는 호출 가능한 속성 같은 속성과 방법을 가진 대상을 클래스, 즉 Classl로 분류할 수 있다.클래스는 하나의 청사진과 같아서 하나의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.