파이톤의 병행 처리를 이해하고 싶습니다 [다중 스레드 편]

21673 단어 Pythontech
파이톤의 병렬 처리를 총괄했다.

병렬 처리란?

並行処理(Concurrent)는 일정 시간 여러 처리를 동시에 하는 용어를 말한다.
예를 들어 인코딩 과정에서 응용 프로그램의 구축을 실행하면서 트위터를 보는 것도 인코딩과 트위터가 병행 처리된다.

병렬 처리와 병렬 처리의 차이


병렬 처리와 유사한 처리로 並列処理(Parallel)가 있다.
간단하게 두 가지 차이를 총결하다.

병렬 처리(Conceurent)


병렬 처리는 잘라내는 순간에 하는 처리이지만 일정 시간 내에 처리를 전환하는 동시에 여러 처리를 하는 것을 말한다.
파이톤에서 지금부터 설명하기 시작한 다선정ThreadPoolExecutor과 이본트로asyncio는 이에 해당한다.
병렬 프로세싱의 고속화는 여러 API에 대한 액세스, 파일 쓰기 등의 입출력 대기 시간에 적용됩니다.

병렬 처리


병렬 처리는 잘라내는 순간에도 여러 처리를 동시에 수행하는 것을 말한다.
파이톤의 다중 핵심 다중 프로세스ProcessPoolExecutor는 이에 해당한다.
병렬 처리의 고속화는 CPU 출력의 암호화 해독을 사용하기에 적합하다.

병렬 처리는 병렬 처리의 개념이다.병행 처리는 평행 처리라고 할 수 있지만 병행 처리라고 해서 반드시 병행 처리는 아니다.
참조: 파이썬 실천 입문-언어 능력을 동원하여 개발 효율을 높이다

Python의 다중 스레드 처리(ThreadPool Executor)


다중 루틴은 이름과 같이 프로세스에서 여러 개의 루틴을 만들고 처리합니다.
파이톤의 다중 루틴은 conceurent입니다.futures 모듈의 ThreadPoolExecutor류를 사용합니다.
컨텍스트 관리자에서 가져올 수 있는 ThreadPool Executor 인스턴스submit()를 호출하여 다중 스레드에서 실행할 함수를 등록합니다.
with ThreadPoolExecutor() as executor
    feature = executor.submit(<マルチスレッドで実行したい関数>)
submit의 반환값result()을 호칭하여 함수 처리 결과를 얻을 수 있습니다.
result = feature.result() # submitした関数の結果を取得
이외에는 집행 중인 상태 확인running()·done()·집행 취소cancelled() 등도 있었다.
import time
from concurrent.futures import ThreadPoolExecutor

def foo():
    time.sleep(3)
    return 'fooo'

with ThreadPoolExecutor() as executor:
    feature = executor.submit(foo)
    print(feature.running()) # True
    print(feature.done()) # False
    feature.cancelled() # 処理のキャンセル
    print(feature.running()) # False
    print(feature.done()) # True

차례차례 처리와 다중 스레드 실행 속도 비교


다중 스레드의 효과를 검증하기 위해 순서대로 처리와 다중 스레드의 집행 속도를 비교해 보자.

미리 준비하다


시간이 필요한 입출력 처리 모듈의 함수를 생성합니다.call_slow_request시간입니다.sleep(3)을 사용했기 때문에 3초의 실행 시간이 필요합니다.
util.py
def call_slow_request():
    time.sleep(3)
    return 'ok'
또 처리 시간을 측정하고 싶어 전용 장식물을 제작한다.
실행 함수에 processing_time의 볼록기를 더해서 함수의 실행 시간을 출력합니다.
util.py
def processing_time(func):
    @wraps(func)
    def wrapper(*args, **keywords):
        st = time.time()  # 開始前の時間を記録
        result = func(*args, **keywords)  # 関数を実行
        print(f'time: {time.time() - st} s')  # 開始後の時間と開始前の時間の差を出力
        return result

    return wrapper

순서대로 집행하다


우선 순서대로 처리하다.run_sequential 중 3회call_slow_request를 수행한다.
run_sequential.py
from util import (call_slow_request, processing_time)


@processing_time
def run_sequential():
    for arg in range(3):
        print(call_slow_request())


if __name__ == '__main__':
    run_sequential()
3회 순서로 3초 대기 처리를 수행하기 때문에run_sequential 9초도 안 걸렸다.
$ python3 run_sequential.py
ok
ok
ok
time: 9.011210680007935 s

다중 스레드 실행


다음은 다선정이다.ThreadPoolExecutor를 사용하여 3회call_slow_request를 집행한다.
run_concurrent.py
from concurrent.futures.thread import ThreadPoolExecutor
from util import (call_slow_request, processing_time)


@processing_time
def run_concurrent():
    with ThreadPoolExecutor() as executor:
        features = [executor.submit(call_slow_request) for _ in range(3)]
        for feature in features:
            print(feature.result())


if __name__ == "__main__":
    run_concurrent()

차례로 처리하는 것보다 대폭 고속화되고 있다는 것을 우리는 알아야 한다고 생각한다.
$ python3 concurrent-demo.py
ok
ok
ok
time: 3.0046510696411133 s
이것은 처리가 라인으로 나뉘어 병행 실행되기 때문이다.
처리가 call_slow_requesttime.sleep(3)단계에 들어간 후 처리는 다음 라인으로 옮겨지기 때문에 기본적으로sleep에서 지정한 초수와 시간축에서 처리를 완성할 수 있다.

스레드 보안 및 잠금


병렬 처리에 대한 단서가 안전합니다.
다중 루틴에서 병행 처리를 할 때 각자의 루틴이 공통된 값을 읽고 쓰는 등 루틴이 아닌 안전한 처리를 끼워 넣으면 오류가 발생할 수 있습니다.
총괄은 라인 안전 처리의 예와 처리(잠금)가 아니다.

비스레드 보안 처리


다음은 클래스 실례 변수Counter를 계수 처리한다.스레드가 안전하지 않기 때문에 다중 스레드 처리는 예상치가 되지 않고 계산 결과는 실행에 따라 달라집니다.
thread_safe.py
from concurrent.futures import (ThreadPoolExecutor, wait)


class Counter:
    def __init__(self):
        self.count = 0

    def count_up_to_1000000(self):
        for _ in range(1_000_000):
            self.count += 1


def worker(counter):
    counter.count_up_to_1000000()


def main():
    counter = Counter()
    with ThreadPoolExecutor() as executor:
        features = [executor.submit(worker, counter) for _ in range(3)]
    print(counter.count)


if __name__ == '__main__':
    main()
# 300000    0にならない!!!
$ python3 thread_safe.py
1848519

$ python3 thread_safe.py
1802127

$ python3 thread_safe.py
1702320
이것은 self.count가 증가할 때의 값을 읽는 과정에서 스레드가 전환되고 여러 스레드에서 같은 값을 동시에 방문하여 일치하지 않기 때문이다.

개선(잠금 도입)


이 문제를 개선하기 위해서는 점증 과정에서 라인에 대한 배타 제어(잠금)가 필요하다.
개선 코드는 여기 있습니다.수출도 3만 원.
thread_safe.py
import threading
from concurrent.futures import (ThreadPoolExecutor, wait)


class Counter:
    lock = threading.Lock()

    def __init__(self):
        self.count = 0

    def count_up_to_1000000(self):
        for _ in range(1_000_000):
            with self.lock:
                self.count += 1


def worker(counter):
    counter.count_up_to_1000000()


def main():
    counter = Counter()
    with ThreadPoolExecutor() as executor:
        features = [executor.submit(worker, counter) for _ in range(3)]
    print(counter.count)


if __name__ == '__main__':
    main()
$ python3 thread_sage.py
3000000
Counter 클래스에서 Lock 실례를 만들고 Lock 실례의 상하문 관리자에서 계수를 실행합니다.with self.lock: 블록에서 진행되는 처리는 잠금이 필요하기 때문에 계수할 때 여러 라인에서 값을 공유하는 문제를 없앴다.Lock 상하문 관리자에서 실례를 실행하는 것은 잠금의 누락을 방지하기 위해서이다.상하문 관리자를 사용하지 않는 경우lock.acquire()로 잠그고lock.release() 해방자물쇠 같은 형태로도 가능하지만,release를 실행하면 잊어버리면 사라진 자물쇠가 되기 때문에 주의해야 한다.

끝맺다


이상은'파이톤이 병행 처리하는 다중 스레드 편을 이해하고 싶다'는 것이다.
다중 루틴 외에도 다중 프로세스, 비동기 비동기 등 병행 처리가 있어 속편을 쓰고 싶습니다.

참고 자료


기본적으로 이 책에서 배운 내용이다.좋은 책 감사합니다.🙏
  • Pythhon 실천 입문 - 언어 능력을 자극하고 개발 효율을 높이다: 서적 소개 | 기술 평론사
  • 병렬 처리에서 병렬 처리로 – IBM Developer
  • 좋은 웹페이지 즐겨찾기