소스 코드 분석 참고: Scheduler

6852 단어 파충
scheduler.py
이 확장 자 는 scrapy 에서 자체 적 으로 가지 고 있 는 scheduler 를 대체 하 는 것 입 니 다 (settings 의 SCHEDULER 변수 에서 지적). 바로 이 확장 자 를 이용 하여 crawler 의 분포 식 스케줄 링 을 실현 하 는 것 입 니 다.그 가 이용 한 데이터 구 조 는 queue 에서 실 현 된 데이터 구조 에서 나온다.
scrapy - redis 가 실현 하 는 두 가지 분포 식: 파충류 분포 식 과 item 처리 분포 식 은 모듈 scheduler 와 모듈 pipelines 에 의 해 이 루어 집 니 다.상기 기타 모듈 은 양자 보조 기능 모듈 로 한다.
import importlib
import six

from scrapy.utils.misc import load_object

from . import connection


# TODO: add SCRAPY_JOB support.
class Scheduler(object):
    """Redis-based scheduler"""

    def __init__(self, server,
                 persist=False,
                 flush_on_start=False,
                 queue_key='%(spider)s:requests',
                 queue_cls='scrapy_redis.queue.SpiderPriorityQueue',
                 dupefilter_key='%(spider)s:dupefilter',
                 dupefilter_cls='scrapy_redis.dupefilter.RFPDupeFilter',
                 idle_before_close=0,
                 serializer=None):
        """Initialize scheduler.
        Parameters
        ----------
        server : Redis
            The redis server instance.
        persist : bool
            Whether to flush requests when closing. Default is False.
        flush_on_start : bool
            Whether to flush requests on start. Default is False.
        queue_key : str
            Requests queue key.
        queue_cls : str
            Importable path to the queue class.
        dupefilter_key : str
            Duplicates filter key.
        dupefilter_cls : str
            Importable path to the dupefilter class.
        idle_before_close : int
            Timeout before giving up.
        """
        if idle_before_close < 0:
            raise TypeError("idle_before_close cannot be negative")

        self.server = server
        self.persist = persist
        self.flush_on_start = flush_on_start
        self.queue_key = queue_key
        self.queue_cls = queue_cls
        self.dupefilter_cls = dupefilter_cls
        self.dupefilter_key = dupefilter_key
        self.idle_before_close = idle_before_close
        self.serializer = serializer
        self.stats = None

    def __len__(self):
        return len(self.queue)

    @classmethod
    def from_settings(cls, settings):
        kwargs = {
            'persist': settings.getbool('SCHEDULER_PERSIST'),
            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
        }

        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            'queue_key': 'SCHEDULER_QUEUE_KEY',
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
            # We use the default setting name to keep compatibility.
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            'serializer': 'SCHEDULER_SERIALIZER',
        }
        for name, setting_name in optional.items():
            val = settings.get(setting_name)
            if val:
                kwargs[name] = val

        # Support serializer as a path to a module.
        if isinstance(kwargs.get('serializer'), six.string_types):
            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

        server = connection.from_settings(settings)
        # Ensure the connection is working.
        server.ping()

        return cls(server=server, **kwargs)

    @classmethod
    def from_crawler(cls, crawler):
        instance = cls.from_settings(crawler.settings)
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance

    def open(self, spider):
        self.spider = spider

        try:
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                key=self.queue_key % {'spider': spider.name},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                             self.queue_cls, e)

        try:
            self.df = load_object(self.dupefilter_cls)(
                server=self.server,
                key=self.dupefilter_key % {'spider': spider.name},
                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                             self.dupefilter_cls, e)

        if self.flush_on_start:
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

    def close(self, reason):
        if not self.persist:
            self.flush()

    def flush(self):
        self.df.clear()
        self.queue.clear()

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request

    def has_pending_requests(self):
        return len(self) > 0

이 파일 은 scrapy. core. scheduler 의 기 존 스케줄 러 대신 scheduler 클래스 를 다시 썼 습 니 다.사실 기 존의 스케줄 러 의 논리 에 큰 변화 가 없 었 다. 주로 redis 를 데이터 저장 의 매개체 로 사용 하여 각 파충류 간 의 통일 적 인 스케줄 링 을 이 루 었 다.scheduler 는 각 spider 의 request 요청 을 예약 합 니 다. scheduler 초기 화 시 settings 파일 을 통 해 quue 와 Dupefilter 의 형식 을 읽 습 니 다 (일반적으로 위 에서 기본 값 으로 사용 합 니 다). quue 와 Dupefilter 가 사용 하 는 key 를 설정 합 니 다 (일반적으로 spider name 에 quue 나 Dupefilter 를 추가 하면 같은 spider 의 인 스 턴 스 에 대해 같은 데이터 블록 을 사용 합 니 다).하나의 request 가 스케줄 링 될 때마다 enqueuerequest 가 호출 되 었 습 니 다. scheduler 는 dupefilter 를 사용 하여 이 url 이 중복 되 는 지 여 부 를 판단 합 니 다. 중복 되 지 않 으 면 quue 용기 에 추가 합 니 다.스케줄 완료 시, nextrequest 가 호출 되면 scheduler 는 quue 용기 의 인 터 페 이 스 를 통 해 request 를 꺼 내 해당 spider 에 보 내 spider 를 기어 오 르 게 합 니 다.

좋은 웹페이지 즐겨찾기