Python 은 multiprocessing 을 이용 하여 가장 간단 한 분포 식 작업 스케줄 링 시스템 인 스 턴 스 를 실현 합 니 다.
Python 의 multiprocessing 모듈 은 다 중 프로 세 스 를 지원 할 뿐만 아니 라 managers 서브 모듈 은 다 중 프로 세 스 를 여러 대의 기계 에 분포 하 는 것 도 지원 합 니 다.하나의 서비스 프로 세 스 는 스케줄 러 로 서 다른 여러 기계 의 여러 프로 세 스 에 임 무 를 분포 시 켜 네트워크 통신 에 의존 할 수 있다.이 모듈 을 사용 하여 간단 한 작업 스케줄 링 시스템 을 실현 할 수 있 을 까 하 는 생각 이 들 었 다.그 전에 python 의 다 중 프로 세 스 관리 패키지 multiprocessing 에 대해 자세히 알 아 보 겠 습 니 다.
multiprocessing.Process
multiprocessing 패 키 지 는 Python 의 다 중 프로 세 스 관리 패키지 입 니 다.이것 은 threading.Thread 와 유사 하 며,multiprocessing.Process 대상 을 이용 하여 프로 세 스 를 만 들 수 있 습 니 다.이 프로 세 스 는 Python 프로그램 내부 에서 작 성 된 함수 에 넣 을 수 있 습 니 다.이 process 대상 은 Thread 대상 과 같은 용법 으로 is 를 가지 고 있 습 니 다.alive(),join([timeout]),run(),start(),terminate()등 방법.속성 은 authkey,daemon(start()설정 을 통 해),exitcode(프로 세 스 가 실 행 될 때 None,CN 이면 신호 N 이 끝 났 음 을 표시 합 니 다),name,pid 입 니 다.이 밖 에 multiprocessing 패키지 에 도 Lock/Event/Semaphore/Condition 클래스 가 있어 프로 세 스 를 동기 화 하 는데 사용 되 며 threading 패키지 의 동명 클래스 와 같 습 니 다.multiprocessing 의 상당 부분 은 threading 과 같은 API 를 사용 합 니 다.다 중 프로 세 스 의 상황 으로 바 뀌 었 을 뿐 입 니 다.
이 모듈 은 스 레 드 처럼 프로 세 스 를 관리 하 는 것 을 표시 합 니 다.이것 은 multiprocessing 의 핵심 입 니 다.threading 과 비슷 하고 다 핵 CPU 에 대한 이 용 률 이 threading 보다 훨씬 좋 습 니 다.
Process 류 의 구조 방법 을 살 펴 보 자.
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
매개 변수 설명:
#coding=utf-8
import multiprocessing
def do(n) :
#
name = multiprocessing.current_process().name
print name,'starting'
print "worker ", n
return
if __name__ == '__main__' :
numList = []
for i in xrange(5) :
p = multiprocessing.Process(target=do, args=(i,))
numList.append(p)
p.start()
p.join()
print "Process end."
실행 결과:
Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.
하위 프로 세 스 를 만 들 때 실행 함수 와 함수 의 인 자 를 입력 하고 process 인 스 턴 스 를 만 들 고 start()방법 으로 시작 합 니 다.join()방법 은 하위 프로 세 스 가 끝 난 후에 계속 실행 하 는 것 을 표시 합 니 다.보통 프로 세 스 간 동기 화 에 사 용 됩 니 다.주의:
Windows 에서 프로 세 스 모듈 을 사용 하려 면 프로 세 스 와 관련 된 코드 를 현재.py 파일
if __name__ == ‘__main__' :
문구 아래 에 써 야 Windows 의 프로 세 스 모듈 을 정상적으로 사용 할 수 있 습 니 다.유 닉 스/리 눅 스 는 필요 없습니다.multiprocess.Pool
피 조작 대상 의 수가 많 지 않 을 때 multiprocessing 중의 Process 동 태 를 직접 이용 하여 여러 프로 세 스 를 만 들 수 있 습 니 다.10 여 개 는 괜 찮 지만 수백 개,수천 개의 목표 라면 수 동 으로 프로 세 스 의 수량 을 제한 하 는 것 이 너무 번 거 롭 습 니 다.이때 프로 세 스 탱크 의 효 과 를 발휘 할 수 있 습 니 다.
Pool 은 사용자 가 호출 할 수 있 도록 지정 한 프로 세 스 를 제공 할 수 있 습 니 다.새로운 요청 이 pool 에 제출 되 었 을 때 풀 이 가득 차지 않 으 면 새 프로 세 스 를 만들어 서 이 요청 을 수행 합 니 다.그러나 풀 의 프로 세 스 수가 규정된 최대 치 에 이 르 렀 다 면 이 요청 은 풀 에 프로 세 스 가 끝 날 때 까지 기 다 려 야 새로운 프로 세 스 를 만 들 수 있 습 니 다.
apply_async 와 apply
함수 원형:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
두 사람 은 모두 프로 세 스 풀 에 새로운 프로 세 스 를 추가 합 니 다.다른 경우 apply 는 새로운 프로 세 스 를 추가 할 때마다 메 인 프로 세 스 와 새로운 프로 세 스 를 병행 하지만 메 인 프로 세 스 는 새 프로 세 스 의 함수 가 실 행 될 때 까지 막 습 니 다.이것 은 매우 비효 율 적 이기 때문에 python 3.x 이후 에는 사용 하지 않 습 니 다.apply_async 와 apply 기능 은 같 지만 메 인 프로 세 스 는 막 히 지 않 습 니 다.
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(msg):
print "*msg: ", msg
time.sleep(3)
print "*end"
if __name__ == "__main__":
# processes,
pool = multiprocessing.Pool(processes=3)
for i in range(10):
msg = "hello [{}]".format(i)
# pool.apply(func, (msg,))
pool.apply_async(func, (msg,)) # , ,
print "--" * 10
pool.close() # pool,
pool.join() # join close, join pool
print "All process done."
실행 결과:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end
*msg: hello [3]
*end
*end
*msg: hello [4]
*msg: hello [5]
*end
*msg: hello [6]
*end
*end
*msg: hello [7]
*msg: hello [8]
*end
*msg: hello [9]
*end*end
*end
All process done.
Process finished with exit code 0
프로 세 스 실행 결과 가 져 오기
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func_with_return(msg):
print "*msg: ", msg
time.sleep(3)
print "*end"
return "{} return".format(msg)
if __name__ == "__main__":
# processes,
pool = multiprocessing.Pool(processes=3)
results = []
for i in range(10):
msg = "hello [{}]".format(i)
res = pool.apply_async(func_with_return, (msg,)) # , ,
results.append(res)
print "--" * 10
pool.close() # pool,
pool.join() # join close, join pool
print "All process done."
print "Return results: "
for i in results:
print i.get() #
결과:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end
*end
*msg: hello [3]
*msg: hello [4]
*end
*msg: hello [5]
*end
*end
*msg: hello [6]
*msg: hello [7]
*end
*msg: hello [8]
*end
*end
*msg: hello [9]
*end
*end
All process done.
Return results:
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return
Process finished with exit code 0
map함수 원형:
map(func, iterable[, chunksize=None])
Pool 클래스 의 map 방법 은 내 장 된 map 함수 용법 과 거의 일치 합 니 다.결 과 를 되 돌 릴 때 까지 프로 세 스 를 막 을 수 있 습 니 다.두 번 째 매개 변 수 는 교체 기 이지 만 실제 사용 에 서 는 전체 대기 열 이 준 비 된 후에 야 프로그램 이 하위 프로 세 스 를 실행 할 수 있 습 니 다.
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func_with_return(msg):
print "*msg: ", msg
time.sleep(3)
print "*end"
return "{} return".format(msg)
if __name__ == "__main__":
# processes,
pool = multiprocessing.Pool(processes=3)
results = []
msgs = []
for i in range(10):
msg = "hello [{}]".format(i)
msgs.append(msg)
results = pool.map(func_with_return, msgs)
print "--" * 10
pool.close() # pool,
pool.join() # join close, join pool
print "All process done."
print "Return results: "
for i in results:
print i #
실행 결과:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end*end
*msg: hello [3]
*msg: hello [4]
*end
*msg: hello [5]
*end*end
*msg: hello [6]
*msg: hello [7]
*end
*msg: hello [8]
*end
*end
*msg: hello [9]
*end
*end
--------------------
All process done.
Return results:
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return
Process finished with exit code 0
메모:실행 결과 에서"-"의 위 치 를 볼 수 있 습 니 다.map 이후 메 인 프로 세 스 가 막 혀 서 map 의 결과 가 돌아 오 기 를 기다 리 고 있 습 니 다.close()
프로 세 스 풀(pool)을 닫 고 새 작업 을 받 지 않 습 니 다.
terminate()
작업 프로 세 스 를 끝내 고 처리 되 지 않 은 작업 을 처리 하지 않 습 니 다.
join()
주 프로 세 스 가 하위 프로 세 스 의 종 료 를 기다 리 는 것 을 막 습 니 다.join 방법 은 close 나 terminate 이후 에 사용 해 야 합 니 다.
프로 세 스 간 통신
다 중 프로 세 스 의 가장 번 거 로 운 점 은 프로 세 스 간 통신 입 니 다.IPC 는 스 레 드 통신 보다 처리 하기 어 려 운 것 이 많 기 때문에 단독 한 편 으로 기록 합 니 다.
멀 티 프로 세 싱 을 이용 하여 가장 간단 한 분포 식 작업 스케줄 링 시스템 을 실현 하 다
Job
먼저 Job 클래스 를 만 듭 니 다.테스트 가 간단 하기 위해 job id 속성 만 포함 합 니 다.앞으로 작업 상태,작업 명령,사용자 등 속성 을 패키지 할 수 있 습 니 다.
job.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class Job:
def __init__(self, job_id):
self.job_id = job_id
MasterMaster 는 작업 을 할당 하고 실행 이 완 료 된 작업 정 보 를 표시 합 니 다.
master.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job
class Master:
def __init__(self):
#
self.dispatched_job_queue = Queue()
#
self.finished_job_queue = Queue()
def get_dispatched_job_queue(self):
return self.dispatched_job_queue
def get_finished_job_queue(self):
return self.finished_job_queue
def start(self):
#
BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)
#
manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
manager.start()
#
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()
# 10 , 10 , 10
job_id = 0
while True:
for i in range(0, 10):
job_id = job_id + 1
job = Job(job_id)
print('Dispatch job: %s' % job.job_id)
dispatched_jobs.put(job)
while not dispatched_jobs.empty():
job = finished_jobs.get(60)
print('Finished Job: %s' % job.job_id)
manager.shutdown()
if __name__ == "__main__":
master = Master()
master.start()
SlaveSlave 는 master 에서 보 낸 작업 을 실행 하고 결 과 를 되 돌려 줍 니 다.
slave.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job
class Slave:
def __init__(self):
#
self.dispatched_job_queue = Queue()
#
self.finished_job_queue = Queue()
def start(self):
#
BaseManager.register('get_dispatched_job_queue')
BaseManager.register('get_finished_job_queue')
# master
server = '127.0.0.1'
print('Connect to server %s...' % server)
manager = BaseManager(address=(server, 8888), authkey='jobs')
manager.connect()
#
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()
# , ,
while True:
job = dispatched_jobs.get(timeout=1)
print('Run job: %s ' % job.job_id)
time.sleep(1)
finished_jobs.put(job)
if __name__ == "__main__":
slave = Slave()
slave.start()
테스트각각 세 개의 Liux 단말 기 를 열 고 첫 번 째 단말 기 는 master 를 실행 합 니 다.두 번 째 단말 기 는 slave 를 실행 합 니 다.실행 결 과 는 다음 과 같 습 니 다.
master
$ python master.py
Dispatch job: 1
Dispatch job: 2
Dispatch job: 3
Dispatch job: 4
Dispatch job: 5
Dispatch job: 6
Dispatch job: 7
Dispatch job: 8
Dispatch job: 9
Dispatch job: 10
Finished Job: 1
Finished Job: 2
Finished Job: 3
Finished Job: 4
Finished Job: 5
Finished Job: 6
Finished Job: 7
Finished Job: 8
Finished Job: 9
Dispatch job: 11
Dispatch job: 12
Dispatch job: 13
Dispatch job: 14
Dispatch job: 15
Dispatch job: 16
Dispatch job: 17
Dispatch job: 18
Dispatch job: 19
Dispatch job: 20
Finished Job: 10
Finished Job: 11
Finished Job: 12
Finished Job: 13
Finished Job: 14
Finished Job: 15
Finished Job: 16
Finished Job: 17
Finished Job: 18
Dispatch job: 21
Dispatch job: 22
Dispatch job: 23
Dispatch job: 24
Dispatch job: 25
Dispatch job: 26
Dispatch job: 27
Dispatch job: 28
Dispatch job: 29
Dispatch job: 30
slave1
$ python slave.py
Connect to server 127.0.0.1...
Run job: 1
Run job: 2
Run job: 3
Run job: 5
Run job: 7
Run job: 9
Run job: 11
Run job: 13
Run job: 15
Run job: 17
Run job: 19
Run job: 21
Run job: 23
slave2
$ python slave.py
Connect to server 127.0.0.1...
Run job: 4
Run job: 6
Run job: 8
Run job: 10
Run job: 12
Run job: 14
Run job: 16
Run job: 18
Run job: 20
Run job: 22
Run job: 24
총결산이상 은 이 글 의 전체 내용 입 니 다.본 논문 의 내용 이 여러분 의 학습 이나 업무 에 어느 정도 참고 학습 가치 가 있 기 를 바 랍 니 다.궁금 한 점 이 있 으 시 면 댓 글 을 남 겨 주 셔 서 저희 에 대한 지지 에 감 사 드 립 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
로마 숫자를 정수로 또는 그 반대로 변환그 중 하나는 로마 숫자를 정수로 변환하는 함수를 만드는 것이었고 두 번째는 그 반대를 수행하는 함수를 만드는 것이었습니다. 문자만 포함합니다'I', 'V', 'X', 'L', 'C', 'D', 'M' ; 문자열이 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.