어떻게 작업 대기 열 을 씁 니까?
9400 단어 Python
사실 핵심 원 리 는 매우 간단 하 다.
4.567917.전체 코드 를 전송 하고 pickle 로 직렬 화 합 니 다
4.567917.함수 이름과 인자 만 전달 합 니 다.worker 에서 import 관련 코드 를 실행 합 니 다
사고 방향 은 참고 할 수 있다.http://flask.pocoo.org/snippets/73/。 rq 도 이런 사고방식 을 사용한다.
celery 의 직렬 화 는 pickle,json,Yml 등 을 선택 할 수 있 습 니 다.pickle 을 선택 하면 사용 하 는 사고 1.공식 적 으로 추천 하지 않 고 안전 문제 가 있 습 니 다.
다음은 내 가 생각 하 는 두 가지 실현 을 보 여 준다.
프로젝트 구조
jub
├── README.md
├── jub
│ ├── __init__.py
│ ├── bin
│ │ ├── __init__.py
│ │ └── worker.py
│ ├── broker_conn.py
│ ├── conf.py
│ ├── exceptions.py
│ ├── task.py
│ ├── utils.py
│ └── worker.py
├── requirements.txt
└── test
├── __init__.py
└── tasks.py
jub/bin/worker.py。명령 행 에서 파 라 메 터 를 받 고 import 가 지정 한 procject 를 찾 아 순환 적 으로 작업 을 수행 합 니 다.
import argparse
import importlib
import sys
from jub.task import tasks
from jub.worker import Worker
def autodiscover(project):
importlib.import_module('%s.tasks' % arg_namespace.project)
print('tasks', tasks)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Jub - Do job!')
parser.add_argument('subcommand')
parser.add_argument('-P', action="store", dest="project")
arg_namespace = parser.parse_args()
if arg_namespace.subcommand:
if arg_namespace.subcommand not in ('worker',):
print('unknown subcommand')
if arg_namespace.subcommand == 'worker':
if not arg_namespace.project:
print('project directory could not be empty')
sys.exit()
autodiscover(arg_namespace.project)
worker = Worker()
worker.run()
jub/broker_conn.py。네트워크 연결,redis 조작.
import re
import redis
from jub import conf
match = re.match('redis://(\d+\.\d+\.\d+\.\d+):(\d+)/(\d+)', conf.DEFAULT_BROKER)
host, port, db = match.group(1), match.group(2), match.group(3)
pool = redis.ConnectionPool(host=host, port=port, db=db)
conn = redis.Redis(connection_pool=pool)
jub/task.py。Task 류,퀘 스 트 포장.
import json
from jub import conf
from jub.broker_conn import conn
from jub.exceptions import UnknownTask
tasks = {}
class Task(object):
def __init__(self, func):
self.task_name = '%s.%s' % (func.__module__, func.__name__)
self.func = func
self.task_id = uuid.uuid4().hex
def __call__(self, *args, **kwargs):
self.execute(*args, **kwargs)
@classmethod
def from_message(cls, message):
message_data = json.loads(message)
task_name = message_data.pop("task_name")
task_id = message_data.pop("task_id")
args = message_data.pop("args")
kwargs = message_data.pop("kwargs")
if task_name not in tasks:
raise UnknownTask(task_name)
t = tasks[task_name]
t.execute(*args, **kwargs)
def delay(self, *args, **kwargs):
if conf.ALWAYS_EAGLE:
self.execute(*args, **kwargs)
else: # send_task
message = json.dumps({'task_name': self.task_name, 'args': args, 'kwargs': kwargs})
conn.rpush(conf.DEFAULT_QUEUE, message)
def execute(self, *args, **kwargs):
self.func(*args, **kwargs)
def task(func):
t = Task(func)
tasks[t.task_name] = t
return t
jub/worker.py。 Worker 클래스,순환 으로 작업 을 수행 합 니 다.
from jub import conf
from jub.broker_conn import conn
from jub.task import Task
class Worker(object):
def execute_next_task(self):
_, message = conn.blpop(conf.DEFAULT_QUEUE)
print('Receive task: ', message)
Task.from_message(message.decode('utf8'))
def run(self):
while True:
self.execute_next_task()
ok,위 는 라 이브 러 리 의 핵심 코드 이 고 아래 는 사용자 가 사용 하 는 코드 입 니 다.
test/tasks.py。 작업 정의.
from jub.task import task
@task
def hello():
print('hello world')
사용 방법 으로 워 커 를 시작 합 니 다.명령 행 입력
$ python -m jub.bin.worker worker -P test
비동기 작업 을 호출 하 다.다른 창 에서 ipython 을 열 고 입력 하 십시오.
>> from test.tasks import hello
>> hello.delay()
워 커 가 작업 을 가 져 오고 코드 를 실행 하 는 것 을 볼 수 있 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Python의 None과 NULL의 차이점 상세 정보그래서 대상 = 속성 + 방법 (사실 방법도 하나의 속성, 데이터 속성과 구별되는 호출 가능한 속성 같은 속성과 방법을 가진 대상을 클래스, 즉 Classl로 분류할 수 있다.클래스는 하나의 청사진과 같아서 하나의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.