어떻게 작업 대기 열 을 씁 니까?

9400 단어 Python
앞서 셀 러 리 를 사용 하 다가 워 커 가 걸 린 bug 를 만 나 관련 문서 와 코드 를 진지 하 게 보고 셀 러 리 가 실현 하 는 원 리 를 알 아 봤 다.
사실 핵심 원 리 는 매우 간단 하 다.
  • 퀘 스 트 를 대기 열 로 보 냅 니 다
  • 데 몬 워 커 의 순환 이 대기 열 에서 퀘 스 트 를 계속 가 져 오고 수행 합 니 다이 임 무 를 어떻게 포장 하고 전달 하 는 지 에 대해 사고 방향 이 함수 전 삼 과 유사 하고 전 치,전 인용 이 있 습 니 다.
    4.567917.전체 코드 를 전송 하고 pickle 로 직렬 화 합 니 다
    4.567917.함수 이름과 인자 만 전달 합 니 다.worker 에서 import 관련 코드 를 실행 합 니 다
    사고 방향 은 참고 할 수 있다.http://flask.pocoo.org/snippets/73/。 rq 도 이런 사고방식 을 사용한다.
    celery 의 직렬 화 는 pickle,json,Yml 등 을 선택 할 수 있 습 니 다.pickle 을 선택 하면 사용 하 는 사고 1.공식 적 으로 추천 하지 않 고 안전 문제 가 있 습 니 다.
    다음은 내 가 생각 하 는 두 가지 실현 을 보 여 준다.
    프로젝트 구조
    jub
    ├── README.md
    ├── jub
    │   ├── __init__.py
    │   ├── bin
    │   │   ├── __init__.py
    │   │   └── worker.py
    │   ├── broker_conn.py
    │   ├── conf.py
    │   ├── exceptions.py
    │   ├── task.py
    │   ├── utils.py
    │   └── worker.py
    ├── requirements.txt
    └── test
        ├── __init__.py
        └── tasks.py

    jub/bin/worker.py。명령 행 에서 파 라 메 터 를 받 고 import 가 지정 한 procject 를 찾 아 순환 적 으로 작업 을 수행 합 니 다.
    import argparse
    import importlib
    import sys
    
    from jub.task import tasks
    from jub.worker import Worker
    
    
    def autodiscover(project):
        importlib.import_module('%s.tasks' % arg_namespace.project)
        print('tasks', tasks)
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser(description='Jub - Do job!')
        parser.add_argument('subcommand')
        parser.add_argument('-P', action="store", dest="project")
        arg_namespace = parser.parse_args()
        if arg_namespace.subcommand:
            if arg_namespace.subcommand not in ('worker',):
                print('unknown subcommand')
    
            if arg_namespace.subcommand == 'worker':
                if not arg_namespace.project:
                    print('project directory could not be empty')
                    sys.exit()
                autodiscover(arg_namespace.project)
                worker = Worker()
                worker.run()

    jub/broker_conn.py。네트워크 연결,redis 조작.
    import re
    import redis
    from jub import conf
    match = re.match('redis://(\d+\.\d+\.\d+\.\d+):(\d+)/(\d+)', conf.DEFAULT_BROKER)
    host, port, db = match.group(1), match.group(2), match.group(3)
    pool = redis.ConnectionPool(host=host, port=port, db=db)
    conn = redis.Redis(connection_pool=pool)

    jub/task.py。Task 류,퀘 스 트 포장.
    import json
    
    from jub import conf
    from jub.broker_conn import conn
    from jub.exceptions import UnknownTask
    
    tasks = {}
    
    
    class Task(object):
        def __init__(self, func):
            self.task_name = '%s.%s' % (func.__module__, func.__name__)
            self.func = func
            self.task_id = uuid.uuid4().hex
    
        def __call__(self, *args, **kwargs):
            self.execute(*args, **kwargs)
    
        @classmethod
        def from_message(cls, message):
            message_data = json.loads(message)
            task_name = message_data.pop("task_name")
            task_id = message_data.pop("task_id")
            args = message_data.pop("args")
            kwargs = message_data.pop("kwargs")
            if task_name not in tasks:
                raise UnknownTask(task_name)
            t = tasks[task_name]
            t.execute(*args, **kwargs)
    
        def delay(self, *args, **kwargs):
            if conf.ALWAYS_EAGLE:
                self.execute(*args, **kwargs)
            else:  # send_task
                message = json.dumps({'task_name': self.task_name, 'args': args, 'kwargs': kwargs})
                conn.rpush(conf.DEFAULT_QUEUE, message)
    
        def execute(self, *args, **kwargs):
            self.func(*args, **kwargs)
    
    
    def task(func):
        t = Task(func)
        tasks[t.task_name] = t
        return t

    jub/worker.py。 Worker 클래스,순환 으로 작업 을 수행 합 니 다.
    from jub import conf
    from jub.broker_conn import conn
    from jub.task import Task
    
    
    class Worker(object):
        def execute_next_task(self):
            _, message = conn.blpop(conf.DEFAULT_QUEUE)
            print('Receive task: ', message)
            Task.from_message(message.decode('utf8'))
    
        def run(self):
            while True:
                self.execute_next_task()

    ok,위 는 라 이브 러 리 의 핵심 코드 이 고 아래 는 사용자 가 사용 하 는 코드 입 니 다.
    test/tasks.py。 작업 정의.
    from jub.task import task
    
    
    @task
    def hello():
        print('hello world')

    사용 방법 으로 워 커 를 시작 합 니 다.명령 행 입력
    $ python -m jub.bin.worker worker -P test

    비동기 작업 을 호출 하 다.다른 창 에서 ipython 을 열 고 입력 하 십시오.
    >> from test.tasks import hello
    >> hello.delay()

    워 커 가 작업 을 가 져 오고 코드 를 실행 하 는 것 을 볼 수 있 습 니 다.

    좋은 웹페이지 즐겨찾기