C++메시지 큐 기반 다 중 스 레 드 구현 예제 코드

머리말
메시지 큐 를 실현 하 는 관건 적 인 요 소 는 서로 다른 스 레 드 가 메시지 큐 에 접근 하 는 동기 화 문 제 를 고려 하 는 것 이다.본 실현 은 몇 가지 지식 점 과 관련된다.
std::lock_guard 소개
std::lock_gurad 는 C++11 에서 정 의 된 템 플 릿 클래스 입 니 다.정 의 는 다음 과 같 습 니 다.

template <class Mutex> class lock_guard;
lock_guard 대상 은 보통 특정한 자물쇠(Lock)대상 을 관리 하 는 데 사용 되 기 때문에 Mutex RAII 와 관련 되 어 스 레 드 가 상호 배척 량 에 잠 그 는 것 을 편리 하 게 합 니 다.즉,특정한 lockguard 대상 의 성명 주기 내 에 관리 하 는 잠 금 대상 은 잠 금 상 태 를 유지 합 니 다.이 락guard 의 수명 주기 가 끝나 면 관리 하 는 잠 금 대상 이 잠 금 해 제 됩 니 다.ptr 등 스마트 포인터 가 동적 으로 분 배 된 메모리 자원 을 관리 합 니 다).
템 플 릿 매개 변수 Mutex 는 상호 반 론 량 유형 을 대표 합 니 다.예 를 들 어 std:mutex 유형 은 기본 적 인 Basic Lockable 유형 이 어야 합 니 다.표준 라 이브 러 리 에서 몇 가지 기본 적 인 Basic Lockable 유형 을 정의 합 니 다.각각 std::mutex,std::recursivemutex, std::timed_mutex,std::recursive_timed_mutex 및 std::uniquelock
std::unique_lock 소개
lock_guard 의 가장 큰 단점 도 간단 하고 프로그래머 에 게 충분 한 유연성 을 제공 하지 않 았 기 때문에 C++11 기준 에서 Mutex RAII 와 관련 된 또 다른 유 니 크 를 정의 했다.lock,이 종류 와 lockguard 류 는 비슷 하고 스 레 드 가 상호 배척 량 에 자 물 쇠 를 채 우 는 데 편리 하지만 더 좋 은 잠 금 해제 와 잠 금 제 어 를 제공 합 니 다.
말 그대로 유 니 크lock 대상 은 독점 소유권 방식(unique owership)으로 mutex 대상 의 잠 금 해제 와 잠 금 해제 작업 을 관리 합 니 다.독점 소유권 이란 다른 유 니 크 가 없습니다.lock 대상 은 특정한 mutex 대상 의 소유권 을 동시에 가지 고 있 습 니 다.
새로 생 성 된 유 니 크lock 대상 은 Mutex 대상 m 를 관리 하고 m.lock()을 호출 하여 Mutex 대상 을 잠 그 려 고 합 니 다.이때 다른 유 니 크lock 대상 이 이 Mutex 대상 m 를 관 리 했 으 면 현재 스 레 드 가 막 힐 것 입 니 다.
std::condition 소개
std::conditionvariable 대상 의 어떤 wait 함수 가 호출 되 었 을 때 std::unique 를 사용 합 니 다.lock(std:mutex)을 통 해 현재 스 레 드 를 잠 급 니 다.현재 스 레 드 는 다른 스 레 드 가 같은 std::condition 에 있 을 때 까지 계속 막 힙 니 다.variable 대상 에서 notification 함 수 를 호출 하여 현재 스 레 드 를 깨 웠 습 니 다.
std::condition_variable 는 두 가지 wait()함 수 를 제공 합 니 다.현재 스 레 드 가 wait()를 호출 하면 차 단 됩 니 다.(현재 스 레 드 는 잠 금(mutex)을 가 져 왔 을 것 입 니 다.잠 금 lck 를 가 져 올 때 까지 설정 하 십시오.)다른 스 레 드 가 notify *를 호출 할 때 까지 설정 하 십시오.현재 스 레 드 를 깨 웠 습 니 다.
스 레 드 가 막 혔 을 때 이 함 수 는 자동 으로 lck.unlock()을 호출 하여 잠 금 경쟁 에 막 힌 다른 스 레 드 를 계속 실행 할 수 있 습 니 다.또한,현재 스 레 드 가 알림 을 받 으 면(notified,보통 다른 스 레 드 에서 notify *를 호출 합 니 다.현재 스 레 드 를 깨 웠 습 니 다).wait()함수 도 lck.lock()을 자동 으로 호출 하여 lck 의 상태 와 wait 함수 가 호출 될 때 같 습 니 다.
두 번 째 상황 에서(즉,Predicate 설정)pred 조건 이 false 일 때 wait()를 호출 해 야 현재 스 레 드 를 막 을 수 있 고 다른 스 레 드 의 통 지 를 받 은 후에 pred 가 true 일 때 만 차단 이 해 제 됩 니 다.따라서 두 번 째 상황 은 다음 과 같은 코드 와 유사 하 다.

while (!pred()) wait(lck);
std::function 소개
std::function 을 사용 하면 일반 함수,lambda 표현 식 과 함수 대상 클래스 를 통일 할 수 있 습 니 다.같은 유형 은 아니 지만 function 템 플 릿 류 를 통 해 같은 유형의 대상(function 대상)으로 전환 하여 하나의 vector 나 다른 용기 에 넣 어 리 셋 하기 편리 합 니 다.
코드 구현:

#pragma once

#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H

#include <queue>
#include <mutex>
#include <condition_variable>

template<class Type>
class CMessageQueue
{
public:
 CMessageQueue& operator = (const CMessageQueue&) = delete;
 CMessageQueue(const CMessageQueue& mq) = delete;


 CMessageQueue() :_queue(), _mutex(), _condition(){}
 virtual ~CMessageQueue(){}

 void Push(Type msg){
  std::lock_guard <std::mutex> lock(_mutex);
  _queue.push(msg);
   //                  , condition             
  _condition.notify_one();
 }
  //blocked                  
 bool Pop(Type& msg, bool isBlocked = true){
  if (isBlocked)
  {
   std::unique_lock <std::mutex> lock(_mutex);
   while (_queue.empty())
   {
    _condition.wait(lock);
    
   }
   //         if   ,  lock       if    
   msg = std::move(_queue.front());
   _queue.pop();
   return true;
   
  }
  else
  {
   std::lock_guard<std::mutex> lock(_mutex);
   if (_queue.empty())
    return false;


   msg = std::move(_queue.front());
   _queue.pop();
   return true;
  }

 }

 int32_t Size(){
  std::lock_guard<std::mutex> lock(_mutex);
  return _queue.size();
 }

 bool Empty(){
  std::lock_guard<std::mutex> lock(_mutex);
  return _queue.empty();
 }
private:
 std::queue<Type> _queue;//       
 mutable std::mutex _mutex;//   
 std::condition_variable _condition;//         
};

#endif//MESSAGE_QUEUE_H
스 레 드 탱크 는 구조 함수 에서 스 레 드 를 직접 구성 하고 리 턴 함수 에 들 어 갈 수 있 으 며 Run 함수 디 스 플레이 호출 도 쓸 수 있 습 니 다.여기 서 우 리 는 두 번 째,대 비 를 선택 했다.
4.567917.handler 함수 외부 에서 순환 적 으로 메 시 지 를 받 고 메시지 가 도착 하면 hanlder 로 처리 합 니 다.이 는 상부 에서 패 키 징 을 실현 하지만 스 레 드 에서 호출 함 수 를 자주 전환 합 니 다.이러한 디자인 은 handler 에서 데이터 베 이 스 를 조작 할 때 빈번 한 연결 과 연결 이 필요 하 며,두 개의 가상 함수 인 Prehandler 와 After Handler 를 정의 하여 실현 할 수 있 습 니 다.
!!!구조 함수 에서 가상 함 수 를 호출 하 는 것 은 하위 클래스 의 실현 을 진정 으로 호출 할 수 없습니다!!
가상 함 수 를 실제 호출 할 수 있 지만 프로그래머 가 가상 함 수 를 만 드 는 것 은 동적 연결 을 실현 하 는 것 이 어야 합 니 다.구조 함수 에서 가상 함 수 를 호출 합 니 다.함수 의 입구 주 소 는 컴 파일 할 때 정적 으로 확정 되 었 고 가상 호출 을 실현 하지 않 았 습 니 다
  • Run 함 수 를 써 서 이 부분 을 run 함수 에 넣 고 호출 을 표시 합 니 다.
    조항 9:구조 함수 나 석조 함수 에서 허 함 수 를 영원히 호출 하지 마 십시오
  • 
    #ifndef THREAD_POOL_H
    #define THREAD_POOL_H
    
    
    #include <functional>
    #include <vector>
    #include <thread>
    
    #include "MessageQueue.h"
    
    #define MIN_THREADS 1
    
    template<class Type>
    class CThreadPool
    {
     CThreadPool& operator = (const CThreadPool&) = delete;
     CThreadPool(const CThreadPool& other) = delete;
    
    public:
     CThreadPool(int32_t threads, 
      std::function<void(Type& record, CThreadPool<Type>* pSub)> handler);
     virtual ~CThreadPool();
    
     void Run();
     virtual void PreHandler(){}
     virtual void AfterHandler(){}
     void Submit(Type record);
    
    
    private:
     bool _shutdown;
     int32_t _threads;
     std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler;
     std::vector<std::thread> _workers;
     CMessageQueue<Type> _tasks;
    
    };
    
    
    
    template<class Type>
    CThreadPool<Type>::CThreadPool(int32_t threads, 
     std::function<void(Type& record, CThreadPool<Type>* pSub)> handler)
     :_shutdown(false),
     _threads(threads),
     _handler(handler),
     _workers(),
     _tasks()
    {
    
     //       ,             
     /*if (_threads < MIN_THREADS)
      _threads = MIN_THREADS;
     for (int32_t i = 0; i < _threads; i++)
     {
      
      _workers.emplace_back(
       [this]{
       PreHandler();
       while (!_shutdown){
        Type record;
        _tasks.Pop(record, true);
        _handler(record, this);
       }
       AfterHandler();
      }
      );
     }*/
    
    }
    
    //       
    template<class Type>
    void CThreadPool<Type>::Run()
    {
     if (_threads < MIN_THREADS)
      _threads = MIN_THREADS;
     for (int32_t i = 0; i < _threads; i++)
     {
      _workers.emplace_back(
       [this]{
       PreHandler();
       while (!_shutdown){
        Type record;
        _tasks.Pop(record, true);
        _handler(record, this);
       }
       AfterHandler();
      }
      );
     }
    }
    
    
    
    
    template<class Type>
    CThreadPool<Type>::~CThreadPool()
    {
     for (std::thread& worker : _workers)
      worker.join();
    }
    
    
    template<class Type>
    void CThreadPool<Type>::Submit(Type record)
    {
     _tasks.Push(record);
    }
    #endif // !THREAD_POOL_H
    총결산
    이상 은 이 글 의 모든 내용 입 니 다.본 고의 내용 이 여러분 의 학습 이나 업무 에 어느 정도 참고 학습 가 치 를 가지 기 를 바 랍 니 다.여러분 의 저희 에 대한 지지 에 감 사 드 립 니 다.

    좋은 웹페이지 즐겨찾기