celery 원본 분석 - 정시 작업
19277 단어 web
python3.5.2,celery4.0.2,django1.10.x
celery의 타이밍 작업 및 Django 구성
celery도 정해진 시간에 작업을 수행하여 관련 작업을 수행할 수 있다.celery와django의 설정 방법은 다음과 같다.celeryapp.tasks에 다음 작업 추가
@shared_task
def beat_task():
print("beat_task")
2.celerydjango.setting 파일에 다음 설정을 추가합니다.
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'celery_app',
'django_celery_beat'
]
셀러리가 django와 호흡을 맞춘 상태에서 django를 사용했기 때문이죠celery_beat 제3자 라이브러리이기 때문에 이 앱을 등록해야 합니다.
3.django.celery.py 파일에 다음 설정을 추가합니다.
from celery_django import settings
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'test_beat': {
'task': 'celery_app.tasks.beat_task',
'schedule': timedelta(seconds=3),
},
}
app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)
이로써 프로필 설정이 완료되면 정시 작업 명령을 시작합니다.
(venv) wuzideMacBook-Air:celery_django wuzi$ celery beat -A celery_django -S django
그리고worker 서비스 구역을 시작하면 소비 시간이 실행해야 할 임무에 도달합니다.
celery -A celery_django worker
워크맨의 터미널에서 출력됩니다. 아래와 같습니다.
-------------- [email protected] v4.0.0 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2018-07-12 07:24:56
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_django:0x108934240
- ** ---------- .> transport: redis://127.0.0.1:6379/7
- ** ---------- .> results: redis://127.0.0.1:6379/6
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2018-07-12 07:24:57,512: WARNING/MainProcess] /Users/wuzi/python35/venv/lib/python3.5/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-07-12 07:24:57,546: WARNING/PoolWorker-3] beat_task
[2018-07-12 07:24:57,550: WARNING/PoolWorker-4] beat_task
[2018-07-12 07:24:57,551: WARNING/PoolWorker-2] beat_task
[2018-07-12 07:24:57,560: WARNING/PoolWorker-1] beat_task
celery의 정시 작업 분석
Celerydjango 디렉토리에서 터미널에서 다음 명령을 입력합니다.
celery beat -A celery_django -S django
앞에서 분석한 바에 의하면 이때 들어온 CeleryCommand의commands에 대응하는 beat에 대응하는 클래스, beat가 실행되고 이때도 다음과 같은 코드를 실행한다.
return cls(
app=self.app, on_error=self.on_error,
no_color=self.no_color, quiet=self.quiet,
on_usage_error=partial(self.on_usage_error, command=command),
).run_from_argv(self.prog_name, argv[1:], command=argv[0]) # , run_from_argv
이때cls는 비트 클래스에 대응하지만,bin/beat에 있는 것을 확인합니다.py의 beat 클래스에서 알 수 있듯이, 이 클래스는run 방법과add 만 다시 썼습니다.arguments 방법, 그래서 지금 실행하는runfrom_argv 방법은 비트가 계승하는 Command의runfrom_argv 메서드, Command의handleArgv 방법, 이 방법은 관련 매개 변수 처리를 거친 후call 함수로 호출되고, 이 함수에서run 방법으로 호출됩니다. 이때 호출된run 방법은 beat류에서 다시 쓴run 방법입니다. 이 방법을 보십시오.
def run(self, detach=False, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, workdir=None, **kwargs):
if not detach: #
maybe_drop_privileges(uid=uid, gid=gid)
kwargs.pop('app', None) # app
beat = partial(self.app.Beat,
logfile=logfile, pidfile=pidfile, **kwargs) # ,
if detach: #
with detached(logfile, pidfile, uid, gid, umask, workdir):
return beat().run() #
else:
return beat().run() #
이 때 앱의 비트 속성을 호출,
@cached_property
def Beat(self, **kwargs):
""":program:`celery beat` scheduler application.
See Also:
:class:`~@Beat`.
"""
return self.subclass_with_self('celery.apps.beat:Beat') # celery.apps.beat:Beat
이때celery를 실례화했습니다.apps.비트의 비트 클래스와 이 실례의run 방법을 호출합니다.
def run(self):
print(str(self.colored.cyan(
'celery beat v{0} is starting.'.format(VERSION_BANNER))))
self.init_loader() # Loader
self.set_process_title()
self.start_scheduler() #
여기서 initloader 방법은 계속해서celery/loaders/base에 있습니다.py의 BaseLoader 클래스 initworker 방법,
def init_worker(self):
if not self.worker_initialized: #
self.worker_initialized = True #
self.import_default_modules() # modules
self.on_worker_init()
지금 importdefault_modules의 함수는 다음과 같습니다.
def import_default_modules(self):
signals.import_modules.send(sender=self.app) # apps
return [
self.import_task_module(m) for m in (
tuple(self.builtin_modules) +
tuple(maybe_list(self.app.conf.imports)) +
tuple(maybe_list(self.app.conf.include))
) # modules
]
이 때 계속해서self를 분석합니다.start_scheduler () 함수, 이 함수는 곧 시작할 시간 작업입니다.
def start_scheduler(self):
if self.pidfile: # pidfile
platforms.create_pidlock(self.pidfile) # pid
service = self.Service(
app=self.app,
max_interval=self.max_interval,
scheduler_cls=self.scheduler_cls,
schedule_filename=self.schedule,
) # service
print(self.banner(service)) #
self.setup_logging() #
if self.socket_timeout:
logger.debug('Setting default socket timeout to %r',
self.socket_timeout)
socket.setdefaulttimeout(self.socket_timeout) #
try:
self.install_sync_handler(service) # handler
service.start() #
except Exception as exc:
logger.critical('beat raised exception %s: %r',
exc.__class__, exc,
exc_info=True)
raise
여기서 Service 클래스는 beat입니다.Service 를 실행하고 서비스에 대한 start 메서드를 호출합니다.
def start(self, embedded_process=False):
info('beat: Starting...')
debug('beat: Ticking with max interval->%s',
humanize_seconds(self.scheduler.max_interval)) #
signals.beat_init.send(sender=self) # signal
if embedded_process:
signals.beat_embedded_init.send(sender=self)
platforms.set_process_title('celery beat')
try:
while not self._is_shutdown.is_set(): # Event
interval = self.scheduler.tick() # scheduler.tick()
if interval and interval > 0.0: # 0
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
time.sleep(interval) #
if self.scheduler.should_sync(): #
self.scheduler._do_sync() #
except (KeyboardInterrupt, SystemExit):
self._is_shutdown.set()
finally:
self.sync()
self에서.scheduler.max_interval 시 서비스의 scheduler 속성을 호출합니다.
@cached_property
def scheduler(self):
return self.get_scheduler() # scheduler
서비스가 호출된 getscheduler 함수,
def get_scheduler(self, lazy=False,
extension_namespace='celery.beat_schedulers'):
filename = self.schedule_filename # celerybeat-schedule
aliases = dict(
load_extension_class_names(extension_namespace) or {}) # celery.beat_schedulers
return symbol_by_name(self.scheduler_cls, aliases=aliases)(
app=self.app,
schedule_filename=filename,
max_interval=self.max_interval,
lazy=lazy,
) # django_celery_beat.schedulers:DatabaseScheduler
그중loadextension_class_names는 다음과 같습니다.
def load_extension_class_names(namespace):
try:
from pkg_resources import iter_entry_points
except ImportError: # pragma: no cover
return
for ep in iter_entry_points(namespace):
yield ep.name, ':'.join([ep.module_name, ep.attrs[0]])
이 함수의 기능은 대응하는celery를 찾는 것이다.beat_schedulers에 대응하는 입구 설정, 이 파일은site-packages/djangocelery_beat-1.0.1.dist- info 파일의 내용은 다음과 같습니다.
[celery.beat_schedulers]
django = django_celery_beat.schedulers:DatabaseScheduler
이 때 되돌아오고 가져오는 클래스는djangocelery_beat.schedulers: DatabaseScheduler 클래스입니다.
초기화가 완료되면self로 실행됩니다.scheduler.tick() 함수, DatabaseScheduler의 tick 함수를 보고,
def tick(self, event_t=event_t, min=min,
heappop=heapq.heappop, heappush=heapq.heappush,
heapify=heapq.heapify, mktime=time.mktime):
"""Run a tick - one iteration of the scheduler.
Executes one due task per call.
Returns:
float: preferred delay in seconds for next call.
"""
def _when(entry, next_time_to_run):
return (mktime(entry.schedule.now().timetuple()) +
(adjust(next_time_to_run) or 0))
adjust = self.adjust
max_interval = self.max_interval #
H = self._heap #
if H is None: #
H = self._heap = [event_t(_when(e, e.is_due()[1]) or 0, 5, e)
for e in values(self.schedule)] #
heapify(H) #
if not H: #
return max_interval #
event = H[0] #
entry = event[2] # , ModelEntry
is_due, next_time_to_run = self.is_due(entry) # ModelEntry ,
if is_due: #
verify = heappop(H) #
if verify is event: #
next_entry = self.reserve(entry) # entry
self.apply_entry(entry, producer=self.producer) #
heappush(H, event_t(_when(next_entry, next_time_to_run),
event[1], next_entry)) #
return 0 # 0
else:
heappush(H, verify) # entry
return min(verify[0], max_interval) #
return min(adjust(next_time_to_run) or max_interval, max_interval) # ,
이 때 DatabaseScheduler가 초기화될 때 부모 Scheduler의 구조 방법을 다시 호출하기 때문에 setupschedule 메서드, DatabaseScheduler에서
def setup_schedule(self):
self.install_default_entries(self.schedule)
self.update_from_dict(self.app.conf.beat_schedule)
이 때 등록된 시간 작업이 entry로 실례화됩니다. 이 때 대응하는 Entry는 ModelEntry입니다.
def install_default_entries(self, data):
entries = {}
if self.app.conf.result_expires:
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*'),
'options': {'expires': 12 * 3600},
},
)
self.update_from_dict(entries) #
그중에서self.update_from_dict는 설정된 시간 작업 데이터를 데이터베이스에 저장하는 것입니다.
def update_from_dict(self, mapping):
s = {}
for name, entry in items(mapping):
try:
s[name] = self.Entry.from_entry(name, app=self.app, **entry)
except Exception as exc:
logger.error(ADD_ENTRY_ERROR, name, exc, entry)
self.schedule.update(s) # schedule
이때 대응하는 schedule 속성은 다음과 같습니다.
@property
def schedule(self):
update = False
if not self._initial_read:
debug('DatabaseScheduler: initial read')
update = True
self._initial_read = True
elif self.schedule_changed():
info('DatabaseScheduler: Schedule changed.')
update = True
if update:
self.sync()
self._schedule = self.all_as_schedule()
if logger.isEnabledFor(logging.DEBUG):
debug('Current schedule:
%s', '
'.join(
repr(entry) for entry in values(self._schedule)),
)
return self._schedule
그중에 데이터베이스에 저장된 값이 all 을 통해as_schedule 방법을 읽어내면,
def all_as_schedule(self):
debug('DatabaseScheduler: Fetching database schedule')
s = {}
for model in self.Model.objects.enabled():
try:
s[model.name] = self.Entry(model, app=self.app)
except ValueError:
pass
return s
데이터베이스 필드에서 작업이 실행될 수 있는지 확인한 다음 ModelEntry를 사용하여 인스턴스를 초기화하고 돌아갑니다.이 때 start 함수에서 호출된self입니다.scheduler.tick 함수에서 호출된 isdue 함수는 다음과 같이 호출됩니다.
def is_due(self, entry):
return entry.is_due() #
이 때 entry는 ModelEntry의 실례입니다. 이 종류의 is 를 호출했습니다.due 함수,
def is_due(self):
if not self.model.enabled:
return False, 5.0 # 5 second delay for re-enable. ,
return self.schedule.is_due(self.last_run_at) # schedule is_due
DatabaseScheduler의 상위 Scheduler가 호출된 isdue 방법,
def is_due(self, last_run_at):
"""Return tuple of ``(is_due, next_time_to_check)``.
Notes:
- next time to check is in seconds.
- ``(True, 20)``, means the task should be run now, and the next
time to check is in 20 seconds.
- ``(False, 12.3)``, means the task is not due, but that the
scheduler should check again in 12.3 seconds.
The next time to check is used to save energy/CPU cycles,
it does not need to be accurate but will influence the precision
of your schedule. You must also keep in mind
the value of :setting:`beat_max_loop_interval`,
that decides the maximum number of seconds the scheduler can
sleep between re-checking the periodic task intervals. So if you
have a task that changes schedule at run-time then your next_run_at
check will decide how long it will take before a change to the
schedule takes effect. The max loop interval takes precedence
over the next check at value returned.
.. admonition:: Scheduler max interval variance
The default max loop interval may vary for different schedulers.
For the default scheduler the value is 5 minutes, but for example
the :pypi:`django-celery-beat` database scheduler the value
is 5 seconds.
"""
last_run_at = self.maybe_make_aware(last_run_at)
rem_delta = self.remaining_estimate(last_run_at)
remaining_s = max(rem_delta.total_seconds(), 0)
if remaining_s == 0:
return schedstate(is_due=True, next=self.seconds)
return schedstate(is_due=False, next=remaining_s)
실행 시간이 얼마나 남았는지 계산합니다. 0이 남았으면 현재 실행할 수 있음을 표시하고, 그렇지 않으면 남은 시간을 되돌려줍니다.
실행이 필요할 때 start 함수에서 작업이 필요할 때self를 호출합니다.apply_entry(entry, producer=self.producer),
def apply_entry(self, entry, producer=None):
info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
try:
result = self.apply_async(entry, producer=producer, advance=False) #
except Exception as exc: # pylint: disable=broad-except
error('Message Error: %s
%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id->%s', entry.task, result.id) # id
이 때self를 호출합니다.apply_async,
def apply_async(self, entry, producer=None, advance=True, **kwargs):
# Update time-stamps and run counts before we actually execute,
# so we have that done if an exception is raised (doesn't schedule
# forever.)
entry = self.reserve(entry) if advance else entry #
task = self.app.tasks.get(entry.task) # app task
try:
if task: #
return task.apply_async(entry.args, entry.kwargs,
producer=producer,
**entry.options) #
else:
return self.send_task(entry.task, entry.args, entry.kwargs,
producer=producer,
**entry.options) # app
except Exception as exc: # pylint: disable=broad-except
reraise(SchedulingError, SchedulingError(
"Couldn't apply scheduled task {0.name}: {exc}".format(
entry, exc=exc)), sys.exc_info()[2])
finally:
self._tasks_since_sync += 1
if self.should_sync():
self._do_sync()
이때 이 임무를 보내고 소비자의 소비를 기다리면 정해진 임무의 대략적인 절차가 완성된다.
본문 총결산
주로 시간제 임무의 대략적인 집행 절차를 설명했다. 기본적으로 데이터베이스에 시간제 임무를 저장한 다음에 시간제 임무가 도착했는지 검사를 호출하고 실행해야 할 임무를 소비단에 보내서worker가 집행하기를 기다린다. 이때 시간제 임무의 집행을 완성했다. 그 중의 많은 세부 사항을 일일이 분석하지 않았기 때문에 여러분이 관심이 있으면 스스로 분석할 수 있다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Portswigger의 연구실 작성: CSRF 토큰 보호를 사용한 기본 클릭재킹이 견습생 수준 실습에서는 일부 CSRF 토큰 보호가 있음에도 불구하고 클릭재킹에 취약한 웹사이트에서 계정 삭제 흐름을 악용합니다. 주어진 자격 증명으로 로그인하면 계정 페이지로 이동한 후 사용자 계정을 삭제하는 데...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.