Django+Celery 가 정시 작업 을 수행 하 는 예제

머리말
Celery 는 python 을 기반 으로 개발 한 분포 식 작업 대기 열 입 니 다.python WEB 개발 에서 가장 유행 하 는 프레임 워 크 는 Django 입 니 다.그러나 Django 의 요청 처리 과정 은 동기 화 되 어 비동기 작업 을 수행 할 수 없습니다.비동기 작업 처 리 를 하려 면 다른 방식(전단 의 일반 해결 방안 은 ajax 작업)을 통 해 이 루어 져 야 합 니 다.배경 Celery 는 좋 은 선택 입 니 다.만약 한 사용자 가 어떤 조작 을 수행 하 는 데 오래 기 다 려 야 돌아 오 면 사이트 의 스루풋 을 크게 낮 출 수 있다.
다른 한편,우리 가 정시 임 무 를 처리 해 야 할 때 Celery 의 강력 한 생태 환경 도 그의 장점 이다.
Celery 를 어떻게 사용 하 는 지 배 웠 을 때 얻 기 어 려 울 수도 있 습 니 다.저 는 업무 의 여가 시간 을 이용 하여 이 문서 들 을 연구 한 후에 도 거의 일주일 이 걸 렸 습 니 다.지금 은 제 수 요 를 만족 시 킬 수 있 기 때문에 마음 을 가 라 앉 히 고 테스트 를 많이 하고 화 이 팅 을 많이 할 수 있 습 니 다.

2.설정 사용
celery 는 Django 프레임 워 크 에 쉽게 통합 되 며,물론 정시 작업 을 수행 하려 면 django-celery-beta 플러그 인 을 설치 해 야 합 니 다.나중에 설명 할 것 입 니 다.주의해 야 할 것 은 Celery 4.0 은 Django 버 전 만 지원 합 니 다>=1.8 의 경우 1.8 보다 작은 버 전이 라면 Celery 3.1 을 사용 해 야 합 니 다.
이 예제 에 서 는 주로 가방 에 의존 합 니 다.다음 과 같 습 니 다.

celery==4.2.1
Django==1.11.7
django-celery-beat==1.4.0
django-celery-results==1.0.4
PyMySQL==0.9.2
redis==2.10.6
배치 하 다.
새 프로젝트celery_demo,디 렉 터 리 구조(각 app 에 tasks 파일 이 하나 더 있어 서 작업 을 정의 합 니 다):

celery_demo
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── celery_demo
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates
프로젝트 디 렉 터 리celery_demo/celery_demo/디 렉 터 리 에 새로 만 들 기celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

#   django  
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings')  
app = Celery('celery_demo')
#    CELERY_     , settings    
app.config_from_object('django.conf:settings', namespace='CELERY') 
#         app  task.py
app.autodiscover_tasks() 
celery_demo/celery_demo/__init__.py에 쓰기:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
celery_demo/celery_demo/settings.py에 쓰기:

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker  ,  Redis       

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND  ,    redis

CELERY_RESULT_SERIALIZER = 'json' #        
프로젝트 에 들 어 가 는celery_demo디 렉 터 리 시작worker:

celery worker -A taskproj -l debug
정의 및 트리거 작업
모든 tasks 파일 에 작업 정의app01/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
    return x + y
@shared_task
def mul(x, y):
    return x * y
보기 에서 퀘 스 트 실행

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #    
    return JsonResponse({'status':'successful','task_id':res.task_id})
방문 하 다.http://127.0.0.1:8000/index

퀘 스 트 결 과 를 얻 으 려 면 taskid 는 AsyncResult 를 사용 하여 결 과 를 얻 을 수 있 고 백 엔 드 를 통 해 직접 얻 을 수 있 습 니 다.

넓히다
redis,rabbitmq 가 결과 저장 을 할 수 있 는 것 외 에 Django 의 orm 을 결과 저장 으로 사용 할 수 있 습 니 다.물론 의존 플러그 인 을 설치 해 야 합 니 다.이러한 장점 은 django 의 데 이 터 를 통 해 작업 상 태 를 직접 볼 수 있 고 더 많은 조작 을 할 수 있 습 니 다.다음 에 orm 을 결과 저장 으로 사용 하 는 방법 을 소개 합 니 다.
설치 하 다.

pip install django-celery-results
settings.py 설정,앱 등록

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
backend 설정 을 수정 하고 Redis 를 django-db 로 변경 합 니 다.

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND  ,    redis

CELERY_RESULT_BACKEND = 'django-db'  #  django orm       
데이터베이스 수정

python3 manage.py migrate django_celery_results
이 때 데이터 베 이 스 를 더 만 드 는 것 을 볼 수 있 습 니 다:
 
물론 task 표를 조작 해 야 할 때 가 있 습 니 다.다음 소스 코드 의 표 구조 정의:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)
3.Django 에서 정시 퀘 스 트 사용
django 에서 정시 작업 기능 을 사용 하려 면 beat 로 작업 전송 기능 을 수행 해 야 합 니 다.Django 에서 정시 작업 을 사용 하려 면 django-celery-beat 플러그 인 을 설치 해 야 합 니 다.다음은 사용 과정 을 소개 한다.
설치 설정
1.beat 플러그 인 설치

pip3 install django-celery-beat
2.등록 앱

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]
3.데이터베이스 변경

python3 manage.py migrate django_celery_beat
4.각각 워 커 와 베타 시작

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #  beta         

celery worker -A taskproj -l info #  woker
5.admin 설정urls.py에 쓰기:

# urls.py
from django.conf.urls import url
from django.contrib import admin
 
urlpatterns = [
    url(r'^admin/', admin.site.urls),
]
6.사용자 만 들 기

python3 manage.py createsuperuser 
7.admin 에 로그 인하 여 관리(주소http://127.0.0.1:8000/admin)그리고 우리 가 지난번 에 orm 을 결과 로 저장 한 시계 도 볼 수 있다.
http://127.0.0.1:8000/admin/login/?next=/admin/

사용 예시:


결과 보기:

이차 개발
django-celery-beat 플러그 인 은 본질 적 으로 데이터베이스 시트 변화 검사 입 니 다.데이터베이스 시트 가 바 뀌 면 스케줄 러 가 작업 을 다시 읽 고 스케줄 링 을 하기 때문에 맞 춤 형 작업 페이지 를 만 들 려 면 beat 플러그 인의 네 장의 표 만 조작 하면 됩 니 다.물론 스케줄 러 를 스스로 정의 할 수 있 습 니 다.django-clery-beat 플러그 인 은 model 이 내장 되 어 있 습 니 다.가 져 오기 만 하면 orm 작업 을 할 수 있 습 니 다.다음은 django reset api 로 예 를 들 겠 습 니 다.
settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]
urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]
views.py

from django_celery_beat.models import PeriodicTask  #    model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """     """
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination
방문 하 다.http://127.0.0.1:8000/tasks다음 과 같다.

참고 자료:
W-D: https://www.cnblogs.com/wdliu/p/9530219.html
아직도 생각 중이 다.https://blog.csdn.net/mbl114/article/details/78047175
Celery 문서:http://docs.celeryproject.org/en/latest/
이상 은 Django+Celery 가 정시 임 무 를 수행 하 는 예제 의 상세 한 내용 입 니 다.Django Celery 의 정시 임무 에 관 한 자 료 는 저희 의 다른 관련 글 을 주목 하 세 요!

좋은 웹페이지 즐겨찾기