Python 은 어떻게 스 레 드 간 통신 을 실현 합 니까?

문제.
프로그램 에 여러 개의 스 레 드 가 있 습 니 다.이 스 레 드 사이 에서 정보 나 데 이 터 를 안전하게 교환 해 야 합 니 다.
해결 방안
한 라인 에서 다른 라인 으로 데 이 터 를 보 내 는 가장 안전 한 방법 은 아마도 queu 라 이브 러 리 의 대기 열 을 사용 하 는 것 일 것 이다.여러 스 레 드 에 공 유 된 Queue 대상 을 만 듭 니 다.이 스 레 드 는 put()와 get()작업 을 통 해 대기 열 에 요 소 를 추가 하거나 삭제 합 니 다.예 를 들 면:

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
  while True:
    # Produce some data
    ...
    out_q.put(data)

# A thread that consumes data
def consumer(in_q):
  while True:
# Get some data
    data = in_q.get()
    # Process the data
    ...

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
Queue 대상 은 필요 한 자 물 쇠 를 포함 하고 있 기 때문에 여러 스 레 드 에서 데 이 터 를 안전하게 공유 할 수 있 습 니 다.대기 열 을 사용 할 때 생산자 와 소비자 의 폐쇄 문 제 를 조율 하 는 데 문제 가 있 을 수 있 습 니 다.일반적인 해결 방법 은 대기 열 에 특수 한 값 을 두 고 소비자 가 이 값 을 읽 었 을 때 실행 을 중지 하 는 것 이다.예 를 들 면:

from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
  while running:
    # Produce some data
    ...
    out_q.put(data)

  # Put the sentinel on the queue to indicate completion
  out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
  while True:
    # Get some data
    data = in_q.get()

    # Check for termination
    if data is _sentinel:
      in_q.put(_sentinel)
      break

    # Process the data
    ...
이 예 에서 소비자 들 은 이 특수 치 를 읽 은 후에 바로 그것 을 대열 에 놓 고 전달 하 는 특별한 점 이 있다.이렇게 하면 이 대열 을 감청 하 는 모든 소비자 라인 이 모두 닫 힐 수 있다.대기 열 은 가장 흔히 볼 수 있 는 스 레 드 간 통신 체제 임 에 도 불구 하고 자신의 데이터 구 조 를 만 들 고 필요 한 잠 금 과 동기 화 체 제 를 추가 하여 스 레 드 간 통신 을 실현 할 수 있 습 니 다.가장 흔히 볼 수 있 는 방법 은Condition변 수 를 사용 하여 데이터 구 조 를 포장 하 는 것 이다.다음 예 는 스 레 드 보안 우선 순위 대기 열 을 만 드 는 방법 을 보 여 줍 니 다.

import heapq
import threading

class PriorityQueue:
  def __init__(self):
    self._queue = []
    self._count = 0
    self._cv = threading.Condition()
  def put(self, item, priority):
    with self._cv:
      heapq.heappush(self._queue, (-priority, self._count, item))
      self._count += 1
      self._cv.notify()

  def get(self):
    with self._cv:
      while len(self._queue) == 0:
        self._cv.wait()
      return heapq.heappop(self._queue)[-1]
대기 열 을 사용 하여 스 레 드 간 통신 을 하 는 것 은 단 방향,불확실 한 과정 이다.일반적으로 데 이 터 를 받 는 스 레 드 가 언제 데 이 터 를 받 았 는 지 알 고 일 을 시작 할 방법 이 없다.그러나 대기 열 대상 은 다음 예task_done() join() 등 기본 적 인 완성 특성 을 제공 합 니 다.

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
  while running:
    # Produce some data
    ...
    out_q.put(data)

# A thread that consumes data
def consumer(in_q):
  while True:
    # Get some data
    data = in_q.get()

    # Process the data
    ...
    # Indicate completion
    in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()
만약 에 하나의 스 레 드 가 특정한 데이터 항목 을 처리 할 때 바로 통 지 를 받 아야 한다 면 보 낼 데 이 터 를 하나Event와 함께 사용 하면'생산자'는 이Event대상 을 통 해 처리 과정 을 모니터링 할 수 있다.예 는 다음 과 같다.

from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
  while running:
    # Produce some data
    ...
    # Make an (data, event) pair and hand it to the consumer
    evt = Event()
    out_q.put((data, evt))
    ...
    # Wait for the consumer to process the item
    evt.wait()

# A thread that consumes data
def consumer(in_q):
  while True:
    # Get some data
    data, evt = in_q.get()
    # Process the data
    ...
    # Indicate completion
    evt.set()
토론 하 다.
간단 한 대기 열 을 기반 으로 다 중 스 레 드 프로그램 을 만 드 는 것 은 대부분의 경우 현명 한 선택 입 니 다.스 레 드 보안 대기 열의 밑바닥 실현 을 보면 코드 에 자물쇠 와 다른 밑바닥 의 동기 화 메커니즘 을 사용 할 필요 가 없다.이것 은 당신 의 프로그램 을 엉망 으로 만 들 뿐이다.그 밖 에 대기 열 이라는 메 시 지 를 기반 으로 하 는 통신 체 제 를 사용 하면 더 큰 응용 범주 로 확장 할 수 있 습 니 다.예 를 들 어 프로그램 을 여러 프로 세 스,심지어 분포 식 시스템 에 넣 고 바 텀 대기 열 구 조 를 바 꾸 지 않 아 도 됩 니 다.스 레 드 대기 열 을 사용 할 때 주의해 야 할 문 제 는 대기 열 에 데이터 항목 을 추가 할 때 이 데이터 항목 을 복사 하지 않 는 다 는 것 입 니 다.스 레 드 간 통신 은 실제 온라인 스 레 드 간 전달 대상 참조 입 니 다.대상 의 공유 상태 가 걱정 된다 면 수정 할 수 없 는 데이터 구조(예 를 들 어 정형,문자열 또는 원본)나 대상 의 깊 은 복사 만 전달 하 는 것 이 좋 습 니 다.예 를 들 면:

from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
  while True:
    # Produce some data
    ...
    out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
  while True:
    # Get some data
    data = in_q.get()
    # Process the data
    ...
Queue대상 은 현재 상하 문 에서 매우 유용 한 추가 특성 을 제공한다.예 를 들 어 Queue 대상 을 만 들 때 선택 할 수 있 는size인 자 를 제공 하여 대기 열 에 추가 할 수 있 는 요소 의 수 를 제한 합 니 다.'생산자'와'소비자'의 속도 가 차이 가 있 는 경우 대열 의 요소 수량 에 상한 선 을 추가 하 는 것 은 의미 가 있다.예 를 들 어'생산자'가 프로젝트 를 만 드 는 속도 가'소비자','소비'보다 빠 르 면 고정된 크기 의 대기 열 을 사용 하면 대기 열 이 가득 찼 을 때 대기 열 을 막 아 예상 치 못 한 연쇄 효과 가 확산 되 지 않도록 프로그램 전체 가 잠 겨 있 거나 프로그램 이 정상적으로 작 동 하지 않 습 니 다.통신 의 스 레 드 사이 에서'데이터 제어'를 하 는 것 은 쉽게 실현 되 기 어 려 운 문제 이다.만약 당신 이 대열 의 크기 를 가지 고 문 제 를 해결 하려 고 시도 한 적 이 있다 는 것 을 발견 한다 면,이것 은 아마도 당신 의 프로그램 에 취약 한 디자인 이나 고유 한 신축 문제 가 존재 할 수 있 음 을 상징 할 것 입 니 다.get() put() 방법 은 모두 비 차단 방식 과 시간 초과 설정 을 지원 합 니 다.예 를 들 어:

import queue
q = queue.Queue()

try:
  data = q.get(block=False)
except queue.Empty:
  ...

try:
  q.put(item, block=False)
except queue.Full:
  ...

try:
  data = q.get(timeout=5.0)
except queue.Empty:
  ...
이 동작 들 은 특정한 대기 열 작업 을 수행 할 때 무한 차단 이 발생 하 는 상황 을 피 할 수 있 습 니 다.예 를 들 어 차단 되 지 않 은put() 방법 은 고정된 크기 의 대기 열 과 함께 사용 하면 대기 열 이 가득 찼 을 때 서로 다른 코드 를 실행 할 수 있 습 니 다.예 를 들 어 로그 정 보 를 출력 하고 버 리 는 것 입 니 다.

def producer(q):
  ...
  try:
    q.put(item, block=False)
  except queue.Full:
    log.warning('queued item %r discarded!', item)
만약 에 소비자 스 레 드 가q.get() 와 같은 조작 을 수행 할 때 종료 표 지 를 검사 하기 위해 시간 을 초과 하면 자동 으로 종 료 됩 니 다.q.get() 의 선택 가능 한 매개 변수timeout를 사용 해 야 합 니 다.다음 과 같 습 니 다.

_running = True

def consumer(q):
  while _running:
    try:
      item = q.get(timeout=5.0)
      # Process item
      ...
    except queue.Empty:
      pass
마지막 으로q.qsize() ,q.full() ,q.empty() 등 실 용적 인 방법 으로 한 대열 의 현재 크기 와 상 태 를 얻 을 수 있 습 니 다.그러나 이 방법 들 은 모두 라인 이 안전 한 것 이 아니 라 는 것 을 주의해 야 한다.한 대기 열 에 사용 할 수 있 습 니 다 empty() .이 대기 열 이 비어 있다 고 판단 할 수 있 지만,동시에 다른 스 레 드 는 이 대기 열 에 데이터 항목 을 삽입 할 수 있 습 니 다.그 러 니 코드 에서 이런 방법 을 사용 하지 않 는 것 이 좋 습 니 다.
이상 은 Python 이 스 레 드 간 통신 을 어떻게 실현 하 는 지 에 대한 상세 한 내용 입 니 다.Python 스 레 드 간 통신 에 관 한 자 료 는 우리 의 다른 관련 글 에 관심 을 가 져 주 십시오!

좋은 웹페이지 즐겨찾기