celery 원본 분석 - 정시 작업

19277 단어 web
celery 원본 분석
    python3.5.2,celery4.0.2,django1.10.x  

celery의 타이밍 작업 및 Django 구성
celery도 정해진 시간에 작업을 수행하여 관련 작업을 수행할 수 있다.celery와django의 설정 방법은 다음과 같다.celeryapp.tasks에 다음 작업 추가
@shared_task
def beat_task():
    print("beat_task")

2.celerydjango.setting 파일에 다음 설정을 추가합니다.
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'celery_app',
    'django_celery_beat'
]

셀러리가 django와 호흡을 맞춘 상태에서 django를 사용했기 때문이죠celery_beat 제3자 라이브러리이기 때문에 이 앱을 등록해야 합니다.
3.django.celery.py 파일에 다음 설정을 추가합니다.
from celery_django import settings
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'test_beat': {
        'task': 'celery_app.tasks.beat_task',
        'schedule': timedelta(seconds=3),
    },
}

app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

이로써 프로필 설정이 완료되면 정시 작업 명령을 시작합니다.
(venv) wuzideMacBook-Air:celery_django wuzi$ celery beat -A celery_django -S django

그리고worker 서비스 구역을 시작하면 소비 시간이 실행해야 할 임무에 도달합니다.
celery -A celery_django worker 

워크맨의 터미널에서 출력됩니다. 아래와 같습니다.
     -------------- [email protected] v4.0.0 (latentcall)
    ---- **** ----- 
    --- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2018-07-12 07:24:56
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         celery_django:0x108934240
    - ** ---------- .> transport:   redis://127.0.0.1:6379/7
    - ** ---------- .> results:     redis://127.0.0.1:6379/6
    - *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery


    [2018-07-12 07:24:57,512: WARNING/MainProcess] /Users/wuzi/python35/venv/lib/python3.5/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
      warnings.warn('Using settings.DEBUG leads to a memory leak, never '
    [2018-07-12 07:24:57,546: WARNING/PoolWorker-3] beat_task
    [2018-07-12 07:24:57,550: WARNING/PoolWorker-4] beat_task
    [2018-07-12 07:24:57,551: WARNING/PoolWorker-2] beat_task
    [2018-07-12 07:24:57,560: WARNING/PoolWorker-1] beat_task

celery의 정시 작업 분석
Celerydjango 디렉토리에서 터미널에서 다음 명령을 입력합니다.
celery beat -A celery_django -S django

앞에서 분석한 바에 의하면 이때 들어온 CeleryCommand의commands에 대응하는 beat에 대응하는 클래스, beat가 실행되고 이때도 다음과 같은 코드를 실행한다.
        return cls(
            app=self.app, on_error=self.on_error,
            no_color=self.no_color, quiet=self.quiet,
            on_usage_error=partial(self.on_usage_error, command=command),
        ).run_from_argv(self.prog_name, argv[1:], command=argv[0])      #      ,       run_from_argv

이때cls는 비트 클래스에 대응하지만,bin/beat에 있는 것을 확인합니다.py의 beat 클래스에서 알 수 있듯이, 이 클래스는run 방법과add 만 다시 썼습니다.arguments 방법, 그래서 지금 실행하는runfrom_argv 방법은 비트가 계승하는 Command의runfrom_argv 메서드, Command의handleArgv 방법, 이 방법은 관련 매개 변수 처리를 거친 후call 함수로 호출되고, 이 함수에서run 방법으로 호출됩니다. 이때 호출된run 방법은 beat류에서 다시 쓴run 방법입니다. 이 방법을 보십시오.
def run(self, detach=False, logfile=None, pidfile=None, uid=None,
        gid=None, umask=None, workdir=None, **kwargs):
    if not detach:                                                  #          
        maybe_drop_privileges(uid=uid, gid=gid)         
    kwargs.pop('app', None)                                         #   app    
    beat = partial(self.app.Beat,
                   logfile=logfile, pidfile=pidfile, **kwargs)      #      ,           

    if detach:                                                      #       
        with detached(logfile, pidfile, uid, gid, umask, workdir):
            return beat().run()                                     #     
    else:
        return beat().run()                                         #     

이 때 앱의 비트 속성을 호출,
@cached_property
def Beat(self, **kwargs):
    """:program:`celery beat` scheduler application.

    See Also:
        :class:`~@Beat`.
    """
    return self.subclass_with_self('celery.apps.beat:Beat')     #   celery.apps.beat:Beat 

이때celery를 실례화했습니다.apps.비트의 비트 클래스와 이 실례의run 방법을 호출합니다.
def run(self):
    print(str(self.colored.cyan(
        'celery beat v{0} is starting.'.format(VERSION_BANNER))))
    self.init_loader()                                      #   Loader  
    self.set_process_title()                    
    self.start_scheduler()                                  #       

여기서 initloader 방법은 계속해서celery/loaders/base에 있습니다.py의 BaseLoader 클래스 initworker 방법,
def init_worker(self):
    if not self.worker_initialized:             #           
        self.worker_initialized = True          #       
        self.import_default_modules()           #      modules
        self.on_worker_init()     

지금 importdefault_modules의 함수는 다음과 같습니다.
def import_default_modules(self):
    signals.import_modules.send(sender=self.app)        #            apps
    return [
        self.import_task_module(m) for m in (
            tuple(self.builtin_modules) +
            tuple(maybe_list(self.app.conf.imports)) +
            tuple(maybe_list(self.app.conf.include))
        )                                       #           modules
    ]

이 때 계속해서self를 분석합니다.start_scheduler () 함수, 이 함수는 곧 시작할 시간 작업입니다.
def start_scheduler(self):
    if self.pidfile:                                        #        pidfile
        platforms.create_pidlock(self.pidfile)              #   pid  
    service = self.Service(
        app=self.app,
        max_interval=self.max_interval,
        scheduler_cls=self.scheduler_cls,
        schedule_filename=self.schedule,
    )                                                       #    service 

    print(self.banner(service))                             #         

    self.setup_logging()                                    #     
    if self.socket_timeout:
        logger.debug('Setting default socket timeout to %r',
                     self.socket_timeout)
        socket.setdefaulttimeout(self.socket_timeout)       #         
    try:
        self.install_sync_handler(service)                  #   handler
        service.start()                                     #   
    except Exception as exc:
        logger.critical('beat raised exception %s: %r',
                        exc.__class__, exc,
                        exc_info=True)
        raise

여기서 Service 클래스는 beat입니다.Service 를 실행하고 서비스에 대한 start 메서드를 호출합니다.
def start(self, embedded_process=False):
    info('beat: Starting...')
    debug('beat: Ticking with max interval->%s',
          humanize_seconds(self.scheduler.max_interval))            #         

    signals.beat_init.send(sender=self)                             #      signal   
    if embedded_process: 
        signals.beat_embedded_init.send(sender=self)
        platforms.set_process_title('celery beat')

    try:
        while not self._is_shutdown.is_set():                       #   Event     
            interval = self.scheduler.tick()                        #   scheduler.tick()             
            if interval and interval > 0.0:                         #     0
                debug('beat: Waking up %s.',
                      humanize_seconds(interval, prefix='in '))
                time.sleep(interval)                                #   
                if self.scheduler.should_sync():                    #       
                    self.scheduler._do_sync()                       #     
    except (KeyboardInterrupt, SystemExit):
        self._is_shutdown.set()
    finally:
        self.sync()

self에서.scheduler.max_interval 시 서비스의 scheduler 속성을 호출합니다.
@cached_property
def scheduler(self):
    return self.get_scheduler()                                     #   scheduler

서비스가 호출된 getscheduler 함수,
def get_scheduler(self, lazy=False,
                  extension_namespace='celery.beat_schedulers'):
    filename = self.schedule_filename                               #         celerybeat-schedule
    aliases = dict(
        load_extension_class_names(extension_namespace) or {})      #   celery.beat_schedulers    
    return symbol_by_name(self.scheduler_cls, aliases=aliases)(
        app=self.app,
        schedule_filename=filename,
        max_interval=self.max_interval,
        lazy=lazy,
    )                                                               #           django_celery_beat.schedulers:DatabaseScheduler 

그중loadextension_class_names는 다음과 같습니다.
def load_extension_class_names(namespace):
    try:
        from pkg_resources import iter_entry_points
    except ImportError:  # pragma: no cover
        return

    for ep in iter_entry_points(namespace):
        yield ep.name, ':'.join([ep.module_name, ep.attrs[0]])

이 함수의 기능은 대응하는celery를 찾는 것이다.beat_schedulers에 대응하는 입구 설정, 이 파일은site-packages/djangocelery_beat-1.0.1.dist- info 파일의 내용은 다음과 같습니다.
[celery.beat_schedulers]
django = django_celery_beat.schedulers:DatabaseScheduler

이 때 되돌아오고 가져오는 클래스는djangocelery_beat.schedulers: DatabaseScheduler 클래스입니다.
초기화가 완료되면self로 실행됩니다.scheduler.tick() 함수, DatabaseScheduler의 tick 함수를 보고,
def tick(self, event_t=event_t, min=min,
         heappop=heapq.heappop, heappush=heapq.heappush,
         heapify=heapq.heapify, mktime=time.mktime):
    """Run a tick - one iteration of the scheduler.

    Executes one due task per call.

    Returns:
        float: preferred delay in seconds for next call.
    """
    def _when(entry, next_time_to_run):
        return (mktime(entry.schedule.now().timetuple()) +
                (adjust(next_time_to_run) or 0))

    adjust = self.adjust
    max_interval = self.max_interval                                    #        
    H = self._heap                                                      #    
    if H is None:                                                       #      
        H = self._heap = [event_t(_when(e, e.is_due()[1]) or 0, 5, e) 
                          for e in values(self.schedule)]               #             
        heapify(H)                                                      #           
    if not H:                                                           #     
        return max_interval                                             #         

    event = H[0]                                                        #          
    entry = event[2]                                                    #       ,   ModelEntry
    is_due, next_time_to_run = self.is_due(entry)                       #    ModelEntry      ,          
    if is_due:                                                          #       
        verify = heappop(H)                                             #      
        if verify is event:                                             #         
            next_entry = self.reserve(entry)                            #            entry
            self.apply_entry(entry, producer=self.producer)             #         
            heappush(H, event_t(_when(next_entry, next_time_to_run),
                                event[1], next_entry))                  #                 
            return 0                                                    #   0
        else:
            heappush(H, verify)                                         #     entry         
            return min(verify[0], max_interval)                         #       
    return min(adjust(next_time_to_run) or max_interval, max_interval)  #                    ,                         

이 때 DatabaseScheduler가 초기화될 때 부모 Scheduler의 구조 방법을 다시 호출하기 때문에 setupschedule 메서드, DatabaseScheduler에서
def setup_schedule(self):
    self.install_default_entries(self.schedule)
    self.update_from_dict(self.app.conf.beat_schedule)

이 때 등록된 시간 작업이 entry로 실례화됩니다. 이 때 대응하는 Entry는 ModelEntry입니다.
def install_default_entries(self, data):
    entries = {}
    if self.app.conf.result_expires:
        entries.setdefault(
            'celery.backend_cleanup', {
                'task': 'celery.backend_cleanup',
                'schedule': schedules.crontab('0', '4', '*'),
                'options': {'expires': 12 * 3600},
            },
        )
    self.update_from_dict(entries) #        

그중에서self.update_from_dict는 설정된 시간 작업 데이터를 데이터베이스에 저장하는 것입니다.
def update_from_dict(self, mapping):
    s = {}
    for name, entry in items(mapping):
        try:
            s[name] = self.Entry.from_entry(name, app=self.app, **entry)
        except Exception as exc:
            logger.error(ADD_ENTRY_ERROR, name, exc, entry)
    self.schedule.update(s)         #   schedule   

이때 대응하는 schedule 속성은 다음과 같습니다.
@property
def schedule(self):
    update = False
    if not self._initial_read:
        debug('DatabaseScheduler: initial read')
        update = True
        self._initial_read = True
    elif self.schedule_changed():
        info('DatabaseScheduler: Schedule changed.')
        update = True

    if update:
        self.sync()
        self._schedule = self.all_as_schedule()
        if logger.isEnabledFor(logging.DEBUG):
            debug('Current schedule:
%s', '
'.join( repr(entry) for entry in values(self._schedule)), ) return self._schedule

그중에 데이터베이스에 저장된 값이 all 을 통해as_schedule 방법을 읽어내면,
def all_as_schedule(self):
    debug('DatabaseScheduler: Fetching database schedule')
    s = {}
    for model in self.Model.objects.enabled():
        try:
            s[model.name] = self.Entry(model, app=self.app)
        except ValueError:
            pass
    return s

데이터베이스 필드에서 작업이 실행될 수 있는지 확인한 다음 ModelEntry를 사용하여 인스턴스를 초기화하고 돌아갑니다.이 때 start 함수에서 호출된self입니다.scheduler.tick 함수에서 호출된 isdue 함수는 다음과 같이 호출됩니다.
def is_due(self, entry):
    return entry.is_due()  #      

이 때 entry는 ModelEntry의 실례입니다. 이 종류의 is 를 호출했습니다.due 함수,
def is_due(self):
    if not self.model.enabled:
        return False, 5.0   # 5 second delay for re-enable.               ,              
    return self.schedule.is_due(self.last_run_at)  #   schedule is_due  

DatabaseScheduler의 상위 Scheduler가 호출된 isdue 방법,
def is_due(self, last_run_at):
    """Return tuple of ``(is_due, next_time_to_check)``.

    Notes:
        - next time to check is in seconds.

        - ``(True, 20)``, means the task should be run now, and the next
            time to check is in 20 seconds.

        - ``(False, 12.3)``, means the task is not due, but that the
          scheduler should check again in 12.3 seconds.

    The next time to check is used to save energy/CPU cycles,
    it does not need to be accurate but will influence the precision
    of your schedule.  You must also keep in mind
    the value of :setting:`beat_max_loop_interval`,
    that decides the maximum number of seconds the scheduler can
    sleep between re-checking the periodic task intervals.  So if you
    have a task that changes schedule at run-time then your next_run_at
    check will decide how long it will take before a change to the
    schedule takes effect.  The max loop interval takes precedence
    over the next check at value returned.

    .. admonition:: Scheduler max interval variance

        The default max loop interval may vary for different schedulers.
        For the default scheduler the value is 5 minutes, but for example
        the :pypi:`django-celery-beat` database scheduler the value
        is 5 seconds.
    """
    last_run_at = self.maybe_make_aware(last_run_at)
    rem_delta = self.remaining_estimate(last_run_at)
    remaining_s = max(rem_delta.total_seconds(), 0)
    if remaining_s == 0:
        return schedstate(is_due=True, next=self.seconds)
    return schedstate(is_due=False, next=remaining_s)

실행 시간이 얼마나 남았는지 계산합니다. 0이 남았으면 현재 실행할 수 있음을 표시하고, 그렇지 않으면 남은 시간을 되돌려줍니다.
실행이 필요할 때 start 함수에서 작업이 필요할 때self를 호출합니다.apply_entry(entry, producer=self.producer),
def apply_entry(self, entry, producer=None):
    info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
    try:
        result = self.apply_async(entry, producer=producer, advance=False)      #       
    except Exception as exc:  # pylint: disable=broad-except
        error('Message Error: %s
%s', exc, traceback.format_stack(), exc_info=True) else: debug('%s sent. id->%s', entry.task, result.id) # id

이 때self를 호출합니다.apply_async,
def apply_async(self, entry, producer=None, advance=True, **kwargs):
    # Update time-stamps and run counts before we actually execute,
    # so we have that done if an exception is raised (doesn't schedule
    # forever.)
    entry = self.reserve(entry) if advance else entry       #     
    task = self.app.tasks.get(entry.task)                   #  app      task

    try:
        if task:                                                        #        
            return task.apply_async(entry.args, entry.kwargs,
                                    producer=producer,
                                    **entry.options)                    #        
        else:
            return self.send_task(entry.task, entry.args, entry.kwargs,
                                  producer=producer,
                                  **entry.options)                      #   app    
    except Exception as exc:  # pylint: disable=broad-except
        reraise(SchedulingError, SchedulingError(
            "Couldn't apply scheduled task {0.name}: {exc}".format(
                entry, exc=exc)), sys.exc_info()[2])
    finally:
        self._tasks_since_sync += 1
        if self.should_sync():
            self._do_sync()

이때 이 임무를 보내고 소비자의 소비를 기다리면 정해진 임무의 대략적인 절차가 완성된다.
본문 총결산
주로 시간제 임무의 대략적인 집행 절차를 설명했다. 기본적으로 데이터베이스에 시간제 임무를 저장한 다음에 시간제 임무가 도착했는지 검사를 호출하고 실행해야 할 임무를 소비단에 보내서worker가 집행하기를 기다린다. 이때 시간제 임무의 집행을 완성했다. 그 중의 많은 세부 사항을 일일이 분석하지 않았기 때문에 여러분이 관심이 있으면 스스로 분석할 수 있다.

좋은 웹페이지 즐겨찾기