소스 코드 분석 참고: Scheduler
6852 단어 파충
이 확장 자 는 scrapy 에서 자체 적 으로 가지 고 있 는 scheduler 를 대체 하 는 것 입 니 다 (settings 의 SCHEDULER 변수 에서 지적). 바로 이 확장 자 를 이용 하여 crawler 의 분포 식 스케줄 링 을 실현 하 는 것 입 니 다.그 가 이용 한 데이터 구 조 는 queue 에서 실 현 된 데이터 구조 에서 나온다.
scrapy - redis 가 실현 하 는 두 가지 분포 식: 파충류 분포 식 과 item 처리 분포 식 은 모듈 scheduler 와 모듈 pipelines 에 의 해 이 루어 집 니 다.상기 기타 모듈 은 양자 보조 기능 모듈 로 한다.
import importlib
import six
from scrapy.utils.misc import load_object
from . import connection
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
"""Redis-based scheduler"""
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key='%(spider)s:requests',
queue_cls='scrapy_redis.queue.SpiderPriorityQueue',
dupefilter_key='%(spider)s:dupefilter',
dupefilter_cls='scrapy_redis.dupefilter.RFPDupeFilter',
idle_before_close=0,
serializer=None):
"""Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val
# Support serializer as a path to a module.
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
try:
self.df = load_object(self.dupefilter_cls)(
server=self.server,
key=self.dupefilter_key % {'spider': spider.name},
debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
)
except TypeError as e:
raise ValueError("Failed to instantiate dupefilter class '%s': %s",
self.dupefilter_cls, e)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0
이 파일 은 scrapy. core. scheduler 의 기 존 스케줄 러 대신 scheduler 클래스 를 다시 썼 습 니 다.사실 기 존의 스케줄 러 의 논리 에 큰 변화 가 없 었 다. 주로 redis 를 데이터 저장 의 매개체 로 사용 하여 각 파충류 간 의 통일 적 인 스케줄 링 을 이 루 었 다.scheduler 는 각 spider 의 request 요청 을 예약 합 니 다. scheduler 초기 화 시 settings 파일 을 통 해 quue 와 Dupefilter 의 형식 을 읽 습 니 다 (일반적으로 위 에서 기본 값 으로 사용 합 니 다). quue 와 Dupefilter 가 사용 하 는 key 를 설정 합 니 다 (일반적으로 spider name 에 quue 나 Dupefilter 를 추가 하면 같은 spider 의 인 스 턴 스 에 대해 같은 데이터 블록 을 사용 합 니 다).하나의 request 가 스케줄 링 될 때마다 enqueuerequest 가 호출 되 었 습 니 다. scheduler 는 dupefilter 를 사용 하여 이 url 이 중복 되 는 지 여 부 를 판단 합 니 다. 중복 되 지 않 으 면 quue 용기 에 추가 합 니 다.스케줄 완료 시, nextrequest 가 호출 되면 scheduler 는 quue 용기 의 인 터 페 이 스 를 통 해 request 를 꺼 내 해당 spider 에 보 내 spider 를 기어 오 르 게 합 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
(1) 분포 식 파충류 Scrapy 는 어떻게 해 야 하나 요 - 설치Scrapy 의 설치 에 대해 인터넷 을 샅 샅 이 뒤 졌 습 니 다. 하나씩 설치 하 는 것 은 솔직히 좀 번 거 롭 습 니 다. 그럼 원 키 로 설치 한 것 이 있 습 니까?답 은 분명히 있 습 니 다. 다음은 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.