Python 은 Beanstalkd 를 사용 하여 비동기 임 무 를 처리 하 는 방법 입 니 다.

Beanstalkd 를 메시지 큐 서비스 로 사용 한 다음 Python 의 장식 기 문법 과 결합 하여 간단 한 비동기 작업 처리 도 구 를 실현 합 니 다.
최종 효과
정의 작업:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task

제출 퀘 스 트:

task_one.put(arg1="a", arg2="b", arg3="c")
그리고 백 스테이지 워 크 스 레 드 에서 이 임 무 를 수행 할 수 있 습 니 다.
실현 과정
1.Beanstalk 서버 알 아 보기
Beanstalk is a simple, fast work queue. https://github.com/kr/beanstalkd
Beanstalk 는 C 언어 로 이 루어 진 메시지 큐 서비스 입 니 다.이 는 일반적인 인 터 페 이 스 를 제공 합 니 다.최초 로 디자인 된 목적 은 비동기 실행 에 걸 리 는 작업 을 통 해 대량의 웹 프로그램의 페이지 지연 을 줄 이 는 것 입 니 다.서로 다른 언어 에 대해 서로 다른 Beanstalkd Client 가 실 현 됩 니 다.Python 에 beanstalkc 등 이 있 습 니 다.저 는 beanstalkc 를 이용 하여 beanstalkd server 와 통신 하 는 도구 입 니 다.
2.임무 비동기 집행 실현 원리

beanstalkd 는 문자열 의 작업 스케줄 링 만 할 수 있 습 니 다.프로그램 이 제출 함수 와 파 라 메 터 를 지원 하도록 한 다음 에 woker 에서 함 수 를 실행 하고 파 라 메 터 를 가 져 옵 니 다.함수 와 전 달 된 매개 변 수 를 중간 층 으로 등록 해 야 합 니 다.
실현 은 주로 세 부분 을 포함한다.
Subscriber:함 수 를 beanstalk 의 한 튜브 에 등록 하여 간단 하고 등록 함수 이름과 함수 자체 의 대응 관 계 를 실현 합 니 다.(같은 그룹(tube)에 같은 함수 이름 이 존재 할 수 없다 는 뜻 이다.데 이 터 는 클래스 변수 에 저 장 됩 니 다.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
JobQueue:일반 함 수 를 Putter 능력 을 가 진 장식 기로 쉽게 변환 할 수 있 습 니 다.

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
Putter:함수 명,함수 파라미터,지정 한 그룹 을 하나의 대상 으로 조합 한 다음 json 을 문자열 로 정렬 하고 마지막 으로 beanstalkc 를 통 해 beanstalkd 대기 열 로 보 냅 니 다.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 #       
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 #       
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()

Worker:beanstalkd 대기 열 에서 문자열 을 꺼 낸 다음 json.loads 를 통 해 역 직렬 화 를 대상 으로 함수 명,파라미터,튜브 를 얻 습 니 다.마지막 으로 Subscriber 에서 함수 명 에 대응 하 는 함수 코드 를 얻 은 다음 매개 변수 실행 함 수 를 전달 합 니 다.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) #       ,     timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={'body': message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get('tube')
  func_name = msg.get('func_name')
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get('kwargs')
   func(**kwargs)
   logger.info(u'{}-{}'.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info('{} cost {}s'.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True

위 코드 를 쓸 때 문제 가 하나 있 습 니 다.
Subscriber 를 통 해 함수 이름과 함수 자체 의 대응 관 계 를 등록 합 니 다.Python 해석 기,즉 한 프로 세 스에 서 실 행 됩 니 다.Worker 는 다른 프로 세 스에 서 비동기 로 실 행 됩 니 다.어떻게 해 야 Worker 도 Putter 와 같은 Subscriber 를 얻 을 수 있 습 니까?마지막 으로 Python 의 장식 기 체 제 를 통 해 이 문 제 를 해결 할 수 있다 는 것 을 발견 했다.
바로 이 구절 입 니 다.Subscriber 의 문 제 를 해 결 했 습 니 다.

import_module_by_str('pear.web.controllers.controller_crawler')

# import_module_by_str    
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
import 실행module_by_str 시 호출import__ 동적 로드 클래스 와 함수.JobQueue 를 사용 한 함수 가 있 는 모듈 을 메모리 에 불 러 온 후.Woker 를 실행 할 때 Python 해석 기 는@수 식 된 인 테 리 어 코드 를 먼저 실행 하고 Subscriber 의 대응 관 계 를 메모리 에 불 러 옵 니 다.
실제 사용 은 볼 수 있다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기