Python의 ProcessPool Executor로 처리를 즉시 끝내는 방법

개시하다


Python 다중 프로세스에서 병렬 작업을 수행하는 몇 가지 방법이 있습니다.
그중 하나는 concurrent.futures.ProcessPoolExecutor이다.
Python3.2에 추가된 구축 클래스입니다.
내부에서 MultipProcessing을 사용하면서 IF를 정리(제한)하는 등 사용자에게 부드럽다.
다중 프로그램을 간단하게 사용하고 싶다면 추천하는 라이브러리입니다.
한편, 복잡한 처리를 하려면 IF가 부족해 간지럼을 타지 못하는 경우가 많다.
그중에 ProcessPool Executor가 진행하는 과정 관리에 어려움이 있기 때문에 적어야 한다.

딱한 사정


ProcessPool Executor에서 생성된 Future 완료concurrent.futures.as_completed(fs, timeout=None)를 기다리는 함수가 있습니다.
보시다시피 두 번째 매개 변수에서 시간 초과, 시간 초과 시 발송concurrent.futures.TimeoutError을 지정할 수 있습니다.
시간이 초과된 경우 실행된 퓨처와 실행을 기다리는 퓨처취소가 있다고 생각합니다.
그러나 취소는 실행 중인 Future에는 적용되지 않습니다.
(문서에 기재되어 있지만 일본어 문서에서만 취소된 설명문이 영어라는 것은 수수께끼다.)
집행을 무한히 기다리다 당장 끝내려면 곤란하다.
(수행된 처리 내에서 IO 잠금 등 ← 방법의 경우__del__ 쓰기 해방 처리가 이상적)
무한 기다림이 잘 안 만들어진다는 말도 있지만 메인 프로세스의 시간 초과를 통제하려는 사람도 있을 것 같다.
실제 인코딩은 여기 있습니다.
import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor


def test(value: int) -> int:
    """
    Sleepして指定された値を返す
    Args:
        value: 指定値
    """
    time.sleep(100)
    return value

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Future作成
        futures = []
        for index in range(5):
            future = executor.submit(test, index)
            futures.append(future)

        # 実行
        try:
            timeout = 5
            for future in concurrent.futures.as_completed(futures, timeout):
                result = future.result()
                print(result)

        except concurrent.futures.TimeoutError as _:
            # 現在のfutureの状態を表示
            print("Timeout -----")
            for future in futures:
                print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

            # Futureをキャンセル
            for future in futures:
                if not future.running():
                    future.cancel()

    # 実行後のfutureの状態を確認
    print("Executor Shutdown -----")
    for future in futures:
        print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")
2개의 Worker를 준비하여 지정된 값을 100초 동안 반환하는 5개의 작업을 수행하는 함수입니다.
이것을 집행하면 다음과 같다.
Timeout -----
2579991022224 running: True cancelled: False
2579991067424 running: True cancelled: False
2579991117104 running: True cancelled: False
2579991116480 running: False cancelled: False
2579991117536 running: False cancelled: False
Executor Shutdown -----
2579991022224 running: True cancelled: False
2579991067424 running: True cancelled: False
2579991117104 running: True cancelled: False
2579991116480 running: False cancelled: True
2579991117536 running: False cancelled: True
첫 번째 Future 3개의 실행 중 마스터 프로세스가 종료되지 않았습니다(다음 두 개는 취소됨).
실행 중인 Future 처리를 기다리고 있기 때문입니다.
현재 처리는 약 200초 후에 메인 프로세스가 끝납니다.

해결책


내부 관리 프로세스에 액세스할 수 있으므로 직접 Kill을 제공합니다.
실제 인코딩은 여기 있습니다.
import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor


def test(value: int) -> int:
    """
    Sleepして指定された値を返す
    Args:
        value: 指定値
    """
    time.sleep(100)
    return value

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Future作成
        futures = []
        for index in range(5):
            future = executor.submit(test, index)
            futures.append(future)

        # 実行
        try:
            timeout = 5
            for future in concurrent.futures.as_completed(futures, timeout):
                result = future.result()
                print(result)

        except concurrent.futures.TimeoutError as _:
            # 現在のfutureの状態を表示
            print("Timeout -----")
            for future in futures:
                print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

            # Futureをキャンセル
            for future in futures:
                if not future.running():
                    future.cancel()

            # プロセスをKill
            # !! ここを追加 !!
            for process in executor._processes.values():
                process.kill()

    # 実行後のfutureの状態を確認
    print("Executor Shutdown -----")
    for future in futures:
        print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")
Future를 취소한 후 Kill Process가 진행 중입니다.
ProcessPool Executor는 Proteted 속성이기 때문에 방문할 계획이 없었습니다...
MultipProcessing을 사용하는 경우 매우 일반적입니다.
※ 파이프 데이터가 손상될 수 있으니 주의하세요.
이것을 집행하면 다음과 같다.
Timeout -----
1348685908624 running: True cancelled: False
1348685953824 running: True cancelled: False
1348686003552 running: True cancelled: False
1348686003888 running: False cancelled: False
1348686004128 running: False cancelled: False
Executor Shutdown -----
1348685908624 running: False cancelled: False
1348685953824 running: False cancelled: False
1348686003552 running: False cancelled: False
1348686003888 running: False cancelled: True
1348686004128 running: False cancelled: True
처음 세 개의 퓨처의 running은 False 상태가 됩니다.
이렇게 하면 Future의 실행이 멈추고 메인 프로세스가 곧 끝납니다.

총결산


묘수이지만 ProcessPool Executor를 통해 프로그램을 바로 끝내는 방법을 소개했다.
별로 써본 적은 없지만 그래도 곤란한 점이 있는 것 같아서 참고해주세요.

잡담


ProcessPool Executor의 내부 프로세싱은 ~\lib\concert\buturess\process입니다.예.
처음에 설명문과 그림이 함께 기재되어 있는데 이것은 매우 이해하기 쉬우니 추천합니다.

좋은 웹페이지 즐겨찾기