Python Process

19866 단어 pythonprocessprocess

이전 포스팅은 Thread에 대해 다루었는데요! 2개의 스레드를 만들었지만 시간이 단축되지 않고 1억까지 증가되지 않았습니다. 하지만 병렬로 두개의 프로세스를 만들어 병렬의 프로그래밍 방식으로 구현하면 시간을 단축할 수 있는데요!

프로세스를 만들면 각 프로세스별로 별도의 메모리 영역을 가지며 queuepipe, shared memory를 이용한 IPC(Inter-process-communication_프로세스간 통신)과 같은 방법으로 객체들의 교환을 구현할 수 있습니다. '멀티 프로세스 프로그램'은 각각 별도의 메모리 영역을 가지고, 여러 작업을 동시에 나눠서 처리할 수 있습니다. 그리고 병렬로 작업을 처리하니 하나의 프로세스에서 작업하던 것을 두배 이상 빠르게 할 수 있습니다.

다음의 코드는 thread를 생성했던 것처럼 multiprocessing패키지에서 process 클래스로부터 객체를 만들어 각각의 프로세스에 50000000씩 증가킨 후 q에 삽입해서 최종으로 1억까지 증가시키는 코드입니다.

from multiprocessing import Process, Queue
import time

def worker(id, number, q):
    increased_number = 0

    for i in range(number):
        increased_number += 1
    
    q.put(increased_number)

    return


if __name__ == "__main__":

    start_time = time.time()
    q = Queue()

    th1 = Process(target=worker, args=(1, 50000000, q))
    th2 = Process(target=worker, args=(2, 50000000, q))

    th1.start()
    th2.start()
    th1.join()
    th2.join()


    print("--- %s seconds ---" % (time.time() - start_time))
    q.put('exit')

    total = 0
    while True:
        tmp = q.get()
        if tmp == 'exit':
            break
        else:
            total += tmp

    print("total_number=",end=""), print(total)
    print("end of main")

두개의 쓰레드를 사용해서 구현했을 때보다 두개의 프로세스로 병렬 프로그래밍을 구현하면 훨씬 더 빠르게 진행됨을 알 수 있습니다.

IPC

프로세스의 IPC 방법은 앞에서 설명했듯, queue, pipe, shared memory 등이 있는데요. 파이썬의 multiprocessing은 queuepipe 두 가지 유형의 IPC를 지원하며 각각 queue()나 pipe()로 호출하면 됩니다.

Queue

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue() # queue
    p = Process(target=f, args=(q,))
    p.start()
    p.join()
    print(q.get())
	
# "[42, None, 'hello']"

pipe

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    p.join()
    print(parent_conn.recv())   
   
# "[42, None, 'hello']"
    

shared_memory

스레드는 프로세스를 나눈 단위이며 공유 데이터를 사용할 때 제약이 없지만, 프로세스는 독집적인 메모리 공간을 가지기 때문에 shared_memory를 사용합니다. 공유 메모리를 사용하면 메모리의 일부 공간을 각각의 독립적인 프로세스에서 공유하고, 해당 메모리를 통해 데이터를 주고받을 수 있습니다.

semaphore

semaphore는 지정된 변수만큼의 프로세스 혹은 쓰레드가 자원에 접근할 수 있도록 하는 방법입니다. 공유 메모리에 여러 프로세스가 동시에 사용하기 시작하면 데이터가 손상될 수도 있습니다. 그래서 여러 프로세스 사이에 동작의 순서를 지정해주어야 하는데 이 때 semaphore가 사용됩니다.

Mutex(상호배제)는 오직 하나의 프로세스 혹은 쓰레드만 자원에 접근하도록 하지만 Semaphore는 여러 대상을 처리할 수 있고 공유 메모리라는 임계 영역을 사용할 수있도록 합니다. 그리고 Lock을 사용하지 않기에 현재 수행하지 않는 다른 프로세스가 semaphore를 해제할 수 있습니다.

다음의 코드는 queue를 통한 IPC를 구현한 코드를 shared_memory방식과 semaphore를 사용한 프로그램입니다.

from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time
def worker(id, number, shm, arr, sem):
	#매개 변수 : 프로세스명, 숫자, 공유메모리, 공유메모리를 저장할 배열, 세마포어 객체
	increased_number = 0

    for i in range(number):
        increased_number += 1

    sem.acquire() 
    
    # 기존 공유 메모리 블록에 연결
    existing_shm = shared_memory.SharedMemory(name=shm)
    # numpy 배열 형태에 맞게 변환
    tmp_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=existing_shm.buf)
    # 각각의 프로세스에서 연산한 값을 합해서 numpy 배열에 저장
    tmp_arr[0] += increased_number
    
    sem.release() # mutex와 동일



if __name__ == "__main__":

    start_time = time.time()
    sem = Semaphore()
    # 숫자를 저장할 numpy 배열 생성
    arr = np.array([0])
    # 공유 메모리 생성
    shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
    # 공유 메모리의 버퍼를 numpy 배열로 변환
    np_shm = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    
    th1 = Process(target=worker, args=(1, 50000000, shm.name, np_shm, sem))
    th2 = Process(target=worker, args=(2, 50000000, shm.name, np_shm, sem))


    th1.start()
    th2.start()
    th1.join()
    th2.join()

    print("--- %s seconds ---" % (time.time() - start_time))
    #프로세스에서 계산된 값을 저장한 np_shm 배열 출력
    print("total_number=",end=""), print(np_shm[0]) 
    print("end of main")

    # 공유 메모리 사용 종료
    shm.close()
    # 공유 메모리 블록 삭제
    shm.unlink()

좋은 웹페이지 즐겨찾기