스 레 드 탱크 의 세 가지 사용 방법

import threadpool import time
def sayhello (a): print(“hello: “+a) time.sleep(2)
def main(): global result seed=[“a”,”b”,”c”,”d”,”e”,”f”] start=time.time() task_pool=threadpool.ThreadPool(2) requests=threadpool.makeRequests(sayhello,seed) for req in requests: task_pool.putRequest(req) task_pool.wait() end=time.time() time_m = end-start print(“time: “+str(time_m)) start1=time.time() for each in seed: sayhello(each) end1=time.time() print(“time1: “+str(end1-start1))
if name = = 'main': main () threadpool 은 오래된 모듈 입 니 다. 아직 사용 하 는 사람 이 있 지만 더 이상 주류 가 아 닙 니 다. python 다 중 스 레 드 에 대해 서 는 미래 (future 모듈) 에 들 어 섰 습 니 다 pool = ThreadPool (poolsize) requests = MakeRequests (some callable, list of args, callback) [pool. putRequest (req)for req in requests] pool. wait () 첫 줄 은 하나의 스 레 드 풀 을 정의 합 니 다. poolsize 이렇게 많은 스 레 드 를 만 들 수 있 음 을 표시 합 니 다.
두 번 째 줄 은 MakeRequests 를 호출 하여 다 중 스 레 드 를 열 함수 와 함수 관련 매개 변수 와 리 셋 함 수 를 만 들 었 습 니 다. 그 중에서 리 셋 함 수 는 쓰 지 않 아 도 됩 니 다. default 는 없습니다. 즉, MakeRequests 는 2 개의 매개 변수 만 있 으 면 실행 할 수 있 습 니 다.
세 번 째 줄 의 용법 은 이상 합 니 다. 다 중 스 레 드 를 실행 할 모든 요청 을 스 레 드 풀 에 던 지 는 것 입 니 다. [pool. putRequest (req) for req in requests] 는
  for req in requests:
     pool.putRequest(req)
네 번 째 줄 은 모든 스 레 드 가 완 료 된 후에 종료 하 는 것 입 니 다. 2. 미래:
concurrent. futures 모듈 을 사용 합 니 다. 이 모듈 은 python 3 에서 자체 적 으로 가지 고 있 는 모듈 입 니 다. 그러나 python 2.7 이상 버 전도 설치 하여 사용 할 수 있 습 니 다. 구체 적 인 사용 방식 은 다음 과 같 습 니 다.
! /usr/bin/env python
-- coding: utf-8 --
from concurrent.futures import ThreadPoolExecutor import time
def sayhello(a): print(“hello: “+a) time.sleep(2)
def main(): seed=[“a”,”b”,”c”] start1=time.time() for each in seed: sayhello(each) end1=time.time() print(“time1: “+str(end1-start1)) start2=time.time() with ThreadPoolExecutor(3) as executor: for each in seed: executor.submit(sayhello,each) end2=time.time() print(“time2: “+str(end2-start2)) start3=time.time() with ThreadPoolExecutor(3) as executor1: executor1.map(sayhello,seed) end3=time.time() print(“time3: “+str(end3-start3))
if name = = 'main': main () 실행 결 과 는 다음 과 같 습 니 다.
한 가지 주의:
concurrent. future. ThreadPoolExecutor 는 임 무 를 제출 할 때 두 가지 방식 이 있 습 니 다. 하 나 는 submit () 함수 이 고 다른 하 나 는 map () 함수 입 니 다. 이들 의 주요 차이 점 은 다음 과 같 습 니 다.
2.1 map 는 출력 순 서 를 확보 할 수 있 고 submit 출력 순 서 는 복잡 합 니 다.
2.2, 당신 이 제출 하고 자 하 는 작업 의 함수 가 같다 면 map 로 간략화 할 수 있 습 니 다. 그러나 제출 한 작업 함수 가 다 르 거나 실행 하 는 과정 에 이상 이 생 길 수 있 습 니 다 (map 를 사용 하여 실행 하 는 과정 에서 문 제 를 발견 하면 바로 오 류 를 던 집 니 다). submit () 를 사용 해 야 합 니 다.
2.3 submit 과 map 의 매개 변 수 는 다 릅 니 다. submit 는 매번 목표 함수 와 대응 하 는 매개 변 수 를 제출 해 야 합 니 다. map 는 목표 함 수 를 한 번 만 제출 하고 목표 함수 의 매개 변 수 는 교체 기 (목록, 사전) 에 넣 으 면 됩 니 다.
3. 지금?
여기 서 한 가지 문 제 를 고려 해 야 합 니 다. 상기 두 가지 스 레 드 탱크 의 실현 은 모두 봉 인 된 것 입 니 다. 임 무 는 온라인 스 레 드 탱크 가 초기 화 될 때 한 번 만 추가 할 수 있 습 니 다. 그러면 제 가 지금 이런 수요 가 있다 고 가정 하고 온라인 스 레 드 탱크 가 실 행 될 때 그 안에 새로운 임 무 를 추가 해 야 합 니 다 (주의, 새로운 임무 입 니 다. 새로운 스 레 드 가 아 닙 니 다). 그러면 어떻게 해 야 합 니까?
사실은 두 가지 방법 이 있다.
3.1 threadpool 또는 future 의 함 수 를 다시 씁 니 다.
이 방법 은 소스 모듈 의 소스 코드 를 읽 어야 합 니 다. 초 원 모듈 스 레 드 탱크 의 실현 체 제 를 잘 알 아야 자신의 수요 에 따라 그 중의 방법 을 다시 쓸 수 있 습 니 다.
3.2 스스로 스 레 드 풀 을 구축한다.
이 방법 은 스 레 드 탱크 에 대해 명확 한 이 해 를 가지 고 제 가 구축 한 스 레 드 탱크 를 첨부 해 야 합 니 다.
! /usr/bin/env python
-- coding: utf-8 --
import threading import Queue import hashlib import logging from utils.progress import PrintProgress from utils.save import SaveToSqlite
class ThreadPool(object): def init(self, thread_num, args):
    self.args = args
    self.work_queue = Queue.Queue()
    self.save_queue = Queue.Queue()
    self.threads = []
    self.running = 0
    self.failure = 0
    self.success = 0
    self.tasks = {}
    self.thread_name = threading.current_thread().getName()
    self.__init_thread_pool(thread_num)

#       
def __init_thread_pool(self, thread_num):
    #     
    for i in range(thread_num):
        self.threads.append(WorkThread(self))
    #         
    self.threads.append(PrintProgress(self))
    #     
    self.threads.append(SaveToSqlite(self, self.args.dbfile))

#       
def add_task(self, func, url, deep):
    #     ,         
    url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
    if not url_hash in self.tasks:
        self.tasks[url_hash] = url
        self.work_queue.put((func, url, deep))
        logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))

#       
def get_task(self):
    #        ,  block=True,             。
    task = self.work_queue.get(block=False)

    return task

def task_done(self):
    #                 。
    self.work_queue.task_done()

#     
def start_task(self):
    for item in self.threads:
        item.start()

    logging.debug("Work start")

def increase_success(self):
    self.success += 1

def increase_failure(self):
    self.failure += 1

def increase_running(self):
    self.running += 1

def decrease_running(self):
    self.running -= 1

def get_running(self):
    return self.running

#       
def get_progress_info(self):
    progress_info = {}
    progress_info['work_queue_number'] = self.work_queue.qsize()
    progress_info['tasks_number'] = len(self.tasks)
    progress_info['save_queue_number'] = self.save_queue.qsize()
    progress_info['success'] = self.success
    progress_info['failure'] = self.failure

    return progress_info

def add_save_task(self, url, html):
    self.save_queue.put((url, html))

def get_save_task(self):
    save_task = self.save_queue.get(block=False)

    return save_task

def wait_all_complete(self):
    for item in self.threads:
        if item.isAlive():
            # join     ,      join       ,          
            item.join()

WorkThread 는 threading. Thread 에서 계 승 됩 니 다.
class WorkThread (threading. Thread): \ # 여기 thread pool 은 위의 ThreadPool 클래스 def init (self, thread pool): threading. Thread. init (self) self. thread pool = thread pool
#        , , thread_1,...,thread_n,  start()  ,     。
def run(self):
    print (threading.current_thread().getName())
    while True:
        try:
            # get_task()                   ,   func,url,deep
            do, url, deep = self.thread_pool.get_task()
            self.thread_pool.increase_running()

            #   deep,        
            flag_get_new_link = True
            if deep >= self.thread_pool.args.deep:
                flag_get_new_link = False

            #   do         func,                      
            html, new_link = do(url, self.thread_pool.args, flag_get_new_link)

            if html == '':
                self.thread_pool.increase_failure()
            else:
                self.thread_pool.increase_success()
                # html        
                self.thread_pool.add_save_task(url, html)

            #      , ,                  。
            if new_link:
                for url in new_link:
                    self.thread_pool.add_task(do, url, deep + 1)

            self.thread_pool.decrease_running()
            # self.thread_pool.task_done()
        except Queue.Empty:
            if self.thread_pool.get_running() <= 0:
                break
        except Exception, e:
            self.thread_pool.decrease_running()
            # print str(e)
            break

좋은 웹페이지 즐겨찾기