Python 은 multiprocessing 을 이용 하여 가장 간단 한 분포 식 작업 스케줄 링 시스템 인 스 턴 스 를 실현 합 니 다.

소개 하 다.
Python 의 multiprocessing 모듈 은 다 중 프로 세 스 를 지원 할 뿐만 아니 라 managers 서브 모듈 은 다 중 프로 세 스 를 여러 대의 기계 에 분포 하 는 것 도 지원 합 니 다.하나의 서비스 프로 세 스 는 스케줄 러 로 서 다른 여러 기계 의 여러 프로 세 스 에 임 무 를 분포 시 켜 네트워크 통신 에 의존 할 수 있다.이 모듈 을 사용 하여 간단 한 작업 스케줄 링 시스템 을 실현 할 수 있 을 까 하 는 생각 이 들 었 다.그 전에 python 의 다 중 프로 세 스 관리 패키지 multiprocessing 에 대해 자세히 알 아 보 겠 습 니 다.
multiprocessing.Process
multiprocessing 패 키 지 는 Python 의 다 중 프로 세 스 관리 패키지 입 니 다.이것 은 threading.Thread 와 유사 하 며,multiprocessing.Process 대상 을 이용 하여 프로 세 스 를 만 들 수 있 습 니 다.이 프로 세 스 는 Python 프로그램 내부 에서 작 성 된 함수 에 넣 을 수 있 습 니 다.이 process 대상 은 Thread 대상 과 같은 용법 으로 is 를 가지 고 있 습 니 다.alive(),join([timeout]),run(),start(),terminate()등 방법.속성 은 authkey,daemon(start()설정 을 통 해),exitcode(프로 세 스 가 실 행 될 때 None,CN 이면 신호 N 이 끝 났 음 을 표시 합 니 다),name,pid 입 니 다.이 밖 에 multiprocessing 패키지 에 도 Lock/Event/Semaphore/Condition 클래스 가 있어 프로 세 스 를 동기 화 하 는데 사용 되 며 threading 패키지 의 동명 클래스 와 같 습 니 다.multiprocessing 의 상당 부분 은 threading 과 같은 API 를 사용 합 니 다.다 중 프로 세 스 의 상황 으로 바 뀌 었 을 뿐 입 니 다.
이 모듈 은 스 레 드 처럼 프로 세 스 를 관리 하 는 것 을 표시 합 니 다.이것 은 multiprocessing 의 핵심 입 니 다.threading 과 비슷 하고 다 핵 CPU 에 대한 이 용 률 이 threading 보다 훨씬 좋 습 니 다.
Process 류 의 구조 방법 을 살 펴 보 자.

__init__(self, group=None, target=None, name=None, args=(), kwargs={})
매개 변수 설명:
  • group:프로 세 스 소속 그룹 입 니 다.거의 안 써 요
  • target:호출 대상 을 표시 합 니 다.
  • args:호출 대상 의 위치 매개 변수 모듈 을 표시 합 니 다.
  • name:별명
  • kwargs:호출 대상 을 나타 내 는 사전 입 니 다.
  • 프로 세 스 를 만 드 는 간단 한 인 스 턴 스:
    
    #coding=utf-8
    import multiprocessing
    
    def do(n) :
     #         
     name = multiprocessing.current_process().name
     print name,'starting'
     print "worker ", n
     return 
    
    if __name__ == '__main__' :
     numList = []
     for i in xrange(5) :
     p = multiprocessing.Process(target=do, args=(i,))
     numList.append(p)
     p.start()
     p.join()
     print "Process end."
    실행 결과:
    
    Process-1 starting
    worker 0
    Process end.
    Process-2 starting
    worker 1
    Process end.
    Process-3 starting
    worker 2
    Process end.
    Process-4 starting
    worker 3
    Process end.
    Process-5 starting
    worker 4
    Process end.
    하위 프로 세 스 를 만 들 때 실행 함수 와 함수 의 인 자 를 입력 하고 process 인 스 턴 스 를 만 들 고 start()방법 으로 시작 합 니 다.join()방법 은 하위 프로 세 스 가 끝 난 후에 계속 실행 하 는 것 을 표시 합 니 다.보통 프로 세 스 간 동기 화 에 사 용 됩 니 다.
    주의:
    Windows 에서 프로 세 스 모듈 을 사용 하려 면 프로 세 스 와 관련 된 코드 를 현재.py 파일if __name__ == ‘__main__' :문구 아래 에 써 야 Windows 의 프로 세 스 모듈 을 정상적으로 사용 할 수 있 습 니 다.유 닉 스/리 눅 스 는 필요 없습니다.
    multiprocess.Pool
    피 조작 대상 의 수가 많 지 않 을 때 multiprocessing 중의 Process 동 태 를 직접 이용 하여 여러 프로 세 스 를 만 들 수 있 습 니 다.10 여 개 는 괜 찮 지만 수백 개,수천 개의 목표 라면 수 동 으로 프로 세 스 의 수량 을 제한 하 는 것 이 너무 번 거 롭 습 니 다.이때 프로 세 스 탱크 의 효 과 를 발휘 할 수 있 습 니 다.
    Pool 은 사용자 가 호출 할 수 있 도록 지정 한 프로 세 스 를 제공 할 수 있 습 니 다.새로운 요청 이 pool 에 제출 되 었 을 때 풀 이 가득 차지 않 으 면 새 프로 세 스 를 만들어 서 이 요청 을 수행 합 니 다.그러나 풀 의 프로 세 스 수가 규정된 최대 치 에 이 르 렀 다 면 이 요청 은 풀 에 프로 세 스 가 끝 날 때 까지 기 다 려 야 새로운 프로 세 스 를 만 들 수 있 습 니 다.
    apply_async 와 apply
    함수 원형:
    
    apply_async(func[, args=()[, kwds={}[, callback=None]]])
    두 사람 은 모두 프로 세 스 풀 에 새로운 프로 세 스 를 추가 합 니 다.다른 경우 apply 는 새로운 프로 세 스 를 추가 할 때마다 메 인 프로 세 스 와 새로운 프로 세 스 를 병행 하지만 메 인 프로 세 스 는 새 프로 세 스 의 함수 가 실 행 될 때 까지 막 습 니 다.이것 은 매우 비효 율 적 이기 때문에 python 3.x 이후 에는 사용 하지 않 습 니 다.
    apply_async 와 apply 기능 은 같 지만 메 인 프로 세 스 는 막 히 지 않 습 니 다.
    
    # -*- coding:utf-8 -*-
    
    import multiprocessing
    import time
    
    def func(msg):
     print "*msg: ", msg
     time.sleep(3)
     print "*end"
    
    if __name__ == "__main__":
     #           processes,                   
     pool = multiprocessing.Pool(processes=3)
     for i in range(10):
     msg = "hello [{}]".format(i)
     # pool.apply(func, (msg,))
     pool.apply_async(func, (msg,)) #       ,     ,                           
    
     print "--" * 10
     pool.close() #   pool,             
     pool.join() #    join  close,   join  pool          
     print "All process done."
    실행 결과:
    
    "D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
    --------------------
    *msg: hello [0]
    *msg: hello [1]
    *msg: hello [2]
    *end
    *msg: hello [3]
    *end
    *end
    *msg: hello [4]
    *msg: hello [5]
    *end
    *msg: hello [6]
    *end
    *end
    *msg: hello [7]
    *msg: hello [8]
    *end
    *msg: hello [9]
    *end*end
    
    *end
    All process done.
    
    Process finished with exit code 0
    프로 세 스 실행 결과 가 져 오기
    
    # -*- coding:utf-8 -*-
    
    import multiprocessing
    import time
    
    def func_with_return(msg):
     print "*msg: ", msg
     time.sleep(3)
     print "*end"
     return "{} return".format(msg)
    
    if __name__ == "__main__":
     #           processes,                   
     pool = multiprocessing.Pool(processes=3)
     results = []
     for i in range(10):
     msg = "hello [{}]".format(i)
     res = pool.apply_async(func_with_return, (msg,)) #       ,     ,                           
     results.append(res)
    
     print "--" * 10
     pool.close() #   pool,             
     pool.join() #    join  close,   join  pool          
     print "All process done."
    
     print "Return results: "
     for i in results:
     print i.get() #          
    결과:
    
    "D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
    --------------------
    *msg: hello [0]
    *msg: hello [1]
    *msg: hello [2]
    *end
    *end
    *msg: hello [3]
    *msg: hello [4]
    *end
    *msg: hello [5]
    *end
    *end
    *msg: hello [6]
    *msg: hello [7]
    *end
    *msg: hello [8]
    *end
    *end
    *msg: hello [9]
    *end
    *end
    All process done.
    Return results: 
    hello [0] return
    hello [1] return
    hello [2] return
    hello [3] return
    hello [4] return
    hello [5] return
    hello [6] return
    hello [7] return
    hello [8] return
    hello [9] return
    
    Process finished with exit code 0
    map
    함수 원형:
    
    map(func, iterable[, chunksize=None])
    Pool 클래스 의 map 방법 은 내 장 된 map 함수 용법 과 거의 일치 합 니 다.결 과 를 되 돌 릴 때 까지 프로 세 스 를 막 을 수 있 습 니 다.
    두 번 째 매개 변 수 는 교체 기 이지 만 실제 사용 에 서 는 전체 대기 열 이 준 비 된 후에 야 프로그램 이 하위 프로 세 스 를 실행 할 수 있 습 니 다.
    
    # -*- coding:utf-8 -*-
    
    import multiprocessing
    import time
    
    def func_with_return(msg):
     print "*msg: ", msg
     time.sleep(3)
     print "*end"
     return "{} return".format(msg)
    
    if __name__ == "__main__":
     #           processes,                   
     pool = multiprocessing.Pool(processes=3)
     results = []
     msgs = []
     for i in range(10):
     msg = "hello [{}]".format(i)
     msgs.append(msg)
    
     results = pool.map(func_with_return, msgs)
    
     print "--" * 10
     pool.close() #   pool,             
     pool.join() #    join  close,   join  pool          
     print "All process done."
    
     print "Return results: "
     for i in results:
     print i #          
    실행 결과:
    
    "D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py
    *msg: hello [0]
    *msg: hello [1]
    *msg: hello [2]
    *end*end
    
    *msg: hello [3]
    *msg: hello [4]
    *end
    *msg: hello [5]
    *end*end
    
    *msg: hello [6]
    *msg: hello [7]
    *end
    *msg: hello [8]
    *end
    *end
    *msg: hello [9]
    *end
    *end
    --------------------
    All process done.
    Return results: 
    hello [0] return
    hello [1] return
    hello [2] return
    hello [3] return
    hello [4] return
    hello [5] return
    hello [6] return
    hello [7] return
    hello [8] return
    hello [9] return
    
    Process finished with exit code 0
    메모:실행 결과 에서"-"의 위 치 를 볼 수 있 습 니 다.map 이후 메 인 프로 세 스 가 막 혀 서 map 의 결과 가 돌아 오 기 를 기다 리 고 있 습 니 다.
    close()
    프로 세 스 풀(pool)을 닫 고 새 작업 을 받 지 않 습 니 다.
    terminate()
    작업 프로 세 스 를 끝내 고 처리 되 지 않 은 작업 을 처리 하지 않 습 니 다.
    join()
    주 프로 세 스 가 하위 프로 세 스 의 종 료 를 기다 리 는 것 을 막 습 니 다.join 방법 은 close 나 terminate 이후 에 사용 해 야 합 니 다.
    프로 세 스 간 통신
    다 중 프로 세 스 의 가장 번 거 로 운 점 은 프로 세 스 간 통신 입 니 다.IPC 는 스 레 드 통신 보다 처리 하기 어 려 운 것 이 많 기 때문에 단독 한 편 으로 기록 합 니 다.
    멀 티 프로 세 싱 을 이용 하여 가장 간단 한 분포 식 작업 스케줄 링 시스템 을 실현 하 다
    Job
    먼저 Job 클래스 를 만 듭 니 다.테스트 가 간단 하기 위해 job id 속성 만 포함 합 니 다.앞으로 작업 상태,작업 명령,사용자 등 속성 을 패키지 할 수 있 습 니 다.
    job.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    class Job:
     def __init__(self, job_id):
     self.job_id = job_id
    Master
    Master 는 작업 을 할당 하고 실행 이 완 료 된 작업 정 보 를 표시 합 니 다.
    master.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from Queue import Queue
    from multiprocessing.managers import BaseManager
    from job import Job
    
    
    class Master:
    
     def __init__(self):
     #          
     self.dispatched_job_queue = Queue()
     #        
     self.finished_job_queue = Queue()
    
     def get_dispatched_job_queue(self):
     return self.dispatched_job_queue
    
     def get_finished_job_queue(self):
     return self.finished_job_queue
    
     def start(self):
     #                     
     BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
     BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)
    
     #          
     manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
     manager.start()
    
     #              
     dispatched_jobs = manager.get_dispatched_job_queue()
     finished_jobs = manager.get_finished_job_queue()
    
     #       10   ,  10        ,     10   
     job_id = 0
     while True:
      for i in range(0, 10):
      job_id = job_id + 1
      job = Job(job_id)
      print('Dispatch job: %s' % job.job_id)
      dispatched_jobs.put(job)
    
      while not dispatched_jobs.empty():
      job = finished_jobs.get(60)
      print('Finished Job: %s' % job.job_id)
    
     manager.shutdown()
    
    if __name__ == "__main__":
     master = Master()
     master.start()
    Slave
    Slave 는 master 에서 보 낸 작업 을 실행 하고 결 과 를 되 돌려 줍 니 다.
    slave.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import time
    from Queue import Queue
    from multiprocessing.managers import BaseManager
    from job import Job
    
    
    class Slave:
    
     def __init__(self):
     #          
     self.dispatched_job_queue = Queue()
     #        
     self.finished_job_queue = Queue()
    
     def start(self):
     #                     
     BaseManager.register('get_dispatched_job_queue')
     BaseManager.register('get_finished_job_queue')
    
     #   master
     server = '127.0.0.1'
     print('Connect to server %s...' % server)
     manager = BaseManager(address=(server, 8888), authkey='jobs')
     manager.connect()
    
     #              
     dispatched_jobs = manager.get_dispatched_job_queue()
     finished_jobs = manager.get_finished_job_queue()
    
     #          ,          ,            
     while True:
      job = dispatched_jobs.get(timeout=1)
      print('Run job: %s ' % job.job_id)
      time.sleep(1)
      finished_jobs.put(job)
    
    if __name__ == "__main__":
     slave = Slave()
     slave.start()
    테스트
    각각 세 개의 Liux 단말 기 를 열 고 첫 번 째 단말 기 는 master 를 실행 합 니 다.두 번 째 단말 기 는 slave 를 실행 합 니 다.실행 결 과 는 다음 과 같 습 니 다.
    master
    
    $ python master.py 
    Dispatch job: 1
    Dispatch job: 2
    Dispatch job: 3
    Dispatch job: 4
    Dispatch job: 5
    Dispatch job: 6
    Dispatch job: 7
    Dispatch job: 8
    Dispatch job: 9
    Dispatch job: 10
    Finished Job: 1
    Finished Job: 2
    Finished Job: 3
    Finished Job: 4
    Finished Job: 5
    Finished Job: 6
    Finished Job: 7
    Finished Job: 8
    Finished Job: 9
    Dispatch job: 11
    Dispatch job: 12
    Dispatch job: 13
    Dispatch job: 14
    Dispatch job: 15
    Dispatch job: 16
    Dispatch job: 17
    Dispatch job: 18
    Dispatch job: 19
    Dispatch job: 20
    Finished Job: 10
    Finished Job: 11
    Finished Job: 12
    Finished Job: 13
    Finished Job: 14
    Finished Job: 15
    Finished Job: 16
    Finished Job: 17
    Finished Job: 18
    Dispatch job: 21
    Dispatch job: 22
    Dispatch job: 23
    Dispatch job: 24
    Dispatch job: 25
    Dispatch job: 26
    Dispatch job: 27
    Dispatch job: 28
    Dispatch job: 29
    Dispatch job: 30
    slave1
    
    $ python slave.py 
    Connect to server 127.0.0.1...
    Run job: 1 
    Run job: 2 
    Run job: 3 
    Run job: 5 
    Run job: 7 
    Run job: 9 
    Run job: 11 
    Run job: 13 
    Run job: 15 
    Run job: 17 
    Run job: 19 
    Run job: 21 
    Run job: 23 
    slave2
    
    $ python slave.py 
    Connect to server 127.0.0.1...
    Run job: 4 
    Run job: 6 
    Run job: 8 
    Run job: 10 
    Run job: 12 
    Run job: 14 
    Run job: 16 
    Run job: 18 
    Run job: 20 
    Run job: 22 
    Run job: 24 
    총결산
    이상 은 이 글 의 전체 내용 입 니 다.본 논문 의 내용 이 여러분 의 학습 이나 업무 에 어느 정도 참고 학습 가치 가 있 기 를 바 랍 니 다.궁금 한 점 이 있 으 시 면 댓 글 을 남 겨 주 셔 서 저희 에 대한 지지 에 감 사 드 립 니 다.

    좋은 웹페이지 즐겨찾기