C++11 기반 threadpool 스 레 드 풀(간결 하고 임 의 매개 변 수 를 가 져 올 수 있 습 니 다)

C++11 은 스 레 드 라 이브 러 리 에 가입 하여 표준 라 이브 러 리 가 병발 을 지원 하지 않 는 역 사 를 떠 났 다.그러나 c++다 중 스 레 드 에 대한 지원 은 비교적 저급 하고 약간 고 급 스 러 운 용법 은 모두 스스로 실현 해 야 한다.예 를 들 어 스 레 드 탱크,신 호 량 등 이다.스 레 드 풀(thread pool)이라는 것 은 면접 에서 여러 번 질문 을 받 았 습 니 다.일반적인 대답 은"하나의 작업 대기 열,하나의 스 레 드 대기 열 을 관리 한 다음 에 매번 하나의 임 무 를 하나의 스 레 드 에 배정 하여 반복 합 니 다."괜 찮 을 것 같은 데.근 데 프로그램 을 쓸 때 문제 가 생 겼 어 요.
쓸데없는 말 은 그만 하고 먼저 실현 한 다음 에 다시 떠 들 어 라.dont talk, show me ur code !)
코드 구현

#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

namespace std
{
#define MAX_THREAD_NUM 256

//   ,                      ,         
//        ,               ,Opteron()   
class threadpool
{
  using Task = std::function<void()>;
  //    
  std::vector<std::thread> pool;
  //     
  std::queue<Task> tasks;
  //   
  std::mutex m_lock;
  //     
  std::condition_variable cv_task;
  //       
  std::atomic<bool> stoped;
  //      
  std::atomic<int> idlThrNum;

public:
  inline threadpool(unsigned short size = 4) :stoped{ false }
  {
    idlThrNum = size < 1 ? 1 : size;
    for (size = 0; size < idlThrNum; ++size)
    {  //       
      pool.emplace_back(
        [this]
        { //       
          while(!this->stoped)
          {
            std::function<void()> task;
            {  //          task
              std::unique_lock<std::mutex> lock{ this->m_lock };// unique_lock    lock_guard     :     unlock()   lock()
              this->cv_task.wait(lock,
                [this] {
                  return this->stoped.load() || !this->tasks.empty();
                }
              ); // wait     task
              if (this->stoped && this->tasks.empty())
                return;
              task = std::move(this->tasks.front()); //     task
              this->tasks.pop();
            }
            idlThrNum--;
            task();
            idlThrNum++;
          }
        }
      );
    }
  }
  inline ~threadpool()
  {
    stoped.store(true);
    cv_task.notify_all(); //         
    for (std::thread& thread : pool) {
      //thread.detach(); //    “    ”
      if(thread.joinable())
        thread.join(); //       ,   :        
    }
  }

public:
  //       
  //   .get()             ,     
  //               ,
  //        bind: .commit(std::bind(&Dog::sayHello, &dog));
  //      mem_fn: .commit(std::mem_fn(&Dog::sayHello), &dog)
  template<class F, class... Args>
  auto commit(F&& f, Args&&... args) ->std::future<decltype(f(args...))>
  {
    if (stoped.load())  // stop == true ??
      throw std::runtime_error("commit on ThreadPool is stopped.");

    using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type,    f       
    auto task = std::make_shared<std::packaged_task<RetType()> >(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...)
      );  // wtf !
    std::future<RetType> future = task->get_future();
    {  //        
      std::lock_guard<std::mutex> lock{ m_lock };//          lock_guard   mutex   stack    ,      lock(),      unlock()
      tasks.emplace(
        [task]()
        { // push(Task{...})
          (*task)();
        }
      );
    }
    cv_task.notify_one(); //         

    return future;
  }

  //      
  int idlCount() { return idlThrNum; }

};

}

#endif
코드 가 많 지 않 죠?수백 줄 에 달 하 는 코드 가 스 레 드 풀 을 완 성 했 습 니 다.그리고 commt 를 보 세 요.하,고정 매개 변수 가 아 닙 니 다.매개 변수 수량 제한 이 없습니다!이것 은 가 변 매개 변수 템 플 릿 덕분이다.
어떻게 사용 합 니까?
아래 코드 보기(펼 쳐 보기)

#include "threadpool.h"
#include <iostream>

void fun1(int slp)
{
  printf(" hello, fun1 ! %d
" ,std::this_thread::get_id()); if (slp>0) { printf(" ======= fun1 sleep %d ========= %d
",slp, std::this_thread::get_id()); std::this_thread::sleep_for(std::chrono::milliseconds(slp)); } } struct gfun { int operator()(int n) { printf("%d hello, gfun ! %d
" ,n, std::this_thread::get_id() ); return 42; } }; class A { public: static int Afun(int n = 0) { // static std::cout << n << " hello, Afun ! " << std::this_thread::get_id() << std::endl; return n; } static std::string Bfun(int n, std::string str, char c) { std::cout << n << " hello, Bfun ! "<< str.c_str() <<" " << (int)c <<" " << std::this_thread::get_id() << std::endl; return str; } }; int main() try { std::threadpool executor{ 50 }; A a; std::future<void> ff = executor.commit(fun1,0); std::future<int> fg = executor.commit(gfun{},0); std::future<int> gg = executor.commit(a.Afun, 9999); //IDE , std::future<std::string> gh = executor.commit(A::Bfun, 9998,"mult args", 123); std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id() << std::endl; return "hello,fh ret !"; }); std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::microseconds(900)); for (int i = 0; i < 50; i++) { executor.commit(fun1,i*100 ); } std::cout << " ======= commit all ========= " << std::this_thread::get_id()<< " idlsize="<<executor.idlCount() << std::endl; std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); ff.get(); // .get() , std::cout << fg.get() << " " << fh.get().c_str()<< " " << std::this_thread::get_id() << std::endl; std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); std::cout << " ======= fun1,55 ========= " << std::this_thread::get_id() << std::endl; executor.commit(fun1,55).get(); // .get() std::cout << "end... " << std::this_thread::get_id() << std::endl; std::threadpool pool(4); std::vector< std::future<int> > results; for (int i = 0; i < 8; ++i) { results.emplace_back( pool.commit([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; }) ); } std::cout << " ======= commit all2 ========= " << std::this_thread::get_id() << std::endl; for (auto && result : results) std::cout << result.get() << ' '; std::cout << std::endl; return 0; } catch (std::exception& e) { std::cout << "some unhappy happened... " << std::this_thread::get_id() << e.what() << std::endl; }
혐의 를 피하 기 위해 먼저 저작권 설명 을 하 겠 습 니 다.코드 는 me'쓰기'이지 만 생각 은 인터넷 에서 나 왔 습 니 다.특히 이 스 레 드 탱크 구현(기본적으로 copy 는 이 실현 을 실현 하고 이 학우 의 실현 과 해석 을 더 하면 좋 은 것 은 copy 할 가치 가 있 습 니 다!그리고 종합 적 으로 변경 하여 더욱 간결 하 다.
실현 원리
앞 말 을 이 어 가 며 말 하 라"고 말 했다."작업 대기 열,스 레 드 대기 열 을 관리 한 다음 에 매번 하나의 작업 을 하나의 스 레 드 에 배정 하여 반복 합 니 다."이 사고방식 에 신마 문제 가 있 습 니까?스 레 드 탱크 는 일반적으로 스 레 드 를 재 활용 해 야 하기 때문에 task 를 가 져 와 서 특정한 thread 에 배정 하고 실행 한 후에 다시 분배 합 니 다.언어 차원 에서 기본적으로 지원 되 지 않 습 니 다.일반 언어의 thread 는 모두 고정된 task 함 수 를 실행 하고 실행 이 끝 난 스 레 드 도 끝 납 니 다(적어도 c++는 이 렇 습 니 다).so 는 task 와 thread 의 분 배 를 어떻게 실현 해 야 합 니까?
모든 thread 가 스케줄 링 함 수 를 실행 하도록 합 니 다.하나의 task 를 순환 해서 가 져 온 다음 에 실행 합 니 다.
아이디어 짱 이 죠?thread 함수 의 유일 성 을 확보 하고 스 레 드 를 재 활용 하여 task 를 실행 합 니 다.
아 이 디 어 를 이해 하 더 라 도 코드 는 상세 하 게 설명해 야 한다.
1.하나의 스 레 드 pool,하나의 작업 대기 열 quue,의견 이 없 을 것 입 니 다.
2.작업 대기 열 은 전형 적 인 생산자-소비자 모델 로 본 모델 은 적어도 두 개의 도구 가 필요 합 니 다.하나의 mutex+하나의 조건 변수 또는 하나의 mutex+하나의 신 호 량 이 필요 합 니 다.mutex 는 실제 적 으로 잠 금 입 니 다.작업 의 추가 와 제거(가 져 오기)의 상호 배척 성 을 확보 합 니 다.하나의 조건 변 수 는 task 의 동기 성 을 확보 하 는 것 입 니 다.empty 대기 열,스 레 드 는 기 다 려 야 합 니 다(차단).
3.atomic자 체 는 원자 유형 으로 이름 에서 알 수 있 습 니 다.그들의 조작 load()/store()는 원자 조작 이기 때문에 mutex 를 추가 할 필요 가 없습니다.
c++언어 디 테 일
원 리 를 안다 고 해서 프로그램 을 쓸 수 있 는 것 은 아니다.그 위 에 많은 c+11 의'기예 음교'를 사 용 했 는데 아래 에 간단하게 설명 한다.
  • using Task=function는 유형 별명 으로 type:def 의 용법 을 간소화 했다.function는 함수 형식 이 라 고 볼 수 있 습 니 다.임의의 원형 을 받 아들 이 는 것 은 void()의 함수 나 함수 대상 또는 익명 함수 입 니 다.void()는 인자 가 없 으 며 반환 값 이 없다 는 뜻 입 니 다.
  • pool.emplace_back([this]{...})과 pool.pushback([this]{...})기능 은 같 지만 전자 성능 이 더 좋 습 니 다.
  • pool.emplace_back([this]{...}) 스 레 드 대상 을 만 들 었 습 니 다.실행 함 수 는 람 다 익명 함수 입 니 다.
  • 모든 대상 의 초기 화 방식 은{}을 사용 하고()방식 을 사용 하지 않 습 니 다.스타일 이 일치 하지 않 고 오류 가 발생 하기 쉽 기 때 문 입 니 다.
  • 익명 함수:[this]{...} 긴 말 하지 않다.캡 처 입 니 다.this 는 역외 변 수 를 참조 하 는 this 지침 입 니 다.내부 에 서 는 순환 을 사용 합 니 다.cvtask.wait(lock,[this]{...})스 레 드 를 막 습 니 다.
  • delctype(expr)은 expr 의 유형 을 추정 하 는데 auto 와 유사 하 며 유형 자리 표시 자 에 해당 하 며 하나의 유형의 위 치 를 차지한다.auto f(A a,B b)->decltype(a+b)은 일종 의 용법 입 니 다.decltype(a+b)f(A,B b)를 쓸 수 없습니다.왜 요?!c++는 이렇게 정 해 져 있 습 니 다!
  • commt 방법 이 약간 기발 하지 않 습 니까?임의의 매개 변 수 를 가 져 갈 수 있 습 니 다.첫 번 째 매개 변 수 는 f 이 고 그 다음은 함수 f 의 매개 변수 입 니 다!(메모:파 라 메 터 를 struct/class 에 전달 하려 면 pointer 를 사용 하 는 것 을 권장 합 니 다.변 수 를 조심 하 는 역할 영역)가 변 매개 변수 템 플 릿 은 c+11 의 큰 하 이 라이트 입 니 다.밝 습 니 다!왜 Arg...과 arg...규정 이 이렇게 쓰 이기 때 문 입 니 다!
  • commt 직접 사용 하면 stdcall 함수 만 호출 할 수 있 지만 두 가지 방법 으로 클래스 구성원 을 호출 할 수 있 습 니 다.하 나 는 사용 할 수 있 습 니 다.  bind: .commit(std::bind(&Dog::sayHello, &dog)); 하 나 는 memfn: .commit(std::mem_fn(&Dog::sayHello), &dog);
  • make_shared 는 sharedptr 스마트 포인터.용법 은 대체로 sharedptr p = make_shared(4)그리고*p==4.스마트 포인터 의 장점 은 자동 delete!
  • bid 함수,함수 f 와 일부 인 자 를 받 아들 여 currying 후의 익명 함 수 를 되 돌려 줍 니 다.예 를 들 어 bid(add,4)는 add 4 와 유사 한 함 수 를 실현 할 수 있 습 니 다!
  • forward()함수,move()함수 와 유사 합 니 다.후 자 는 매개 변 수 를 오른쪽 값 으로 만 듭 니 다.전 자 는...붓 습 니까?아마도 최초 로 들 어 온 형식의 인용 유형(왼쪽 값 인지 왼쪽 값 인지,오른쪽 값 인지 오른쪽 값 인지)을 바 꾸 지 않 는 다 는 것 이다.
  • packaged_task 는 퀘 스 트 함수 의 패키지 클래스 입 니 다.get 을 통 해future 에서 future 를 가 져 온 다음 future 를 통 해 함수 의 반환 값(future.get()을 가 져 올 수 있 습 니 다.packaged_task 자체 가 함수 처럼 호출()할 수 있 습 니 다.
  • queue 는 대기 열 클래스 입 니 다.front()는 머리 요 소 를 가 져 오고 pop()은 머리 요 소 를 제거 합 니 다.back()꼬리 요 소 를 가 져 오고 push()꼬리 에 요 소 를 추가 합 니 다.
  • lock_guard 는 mutex 의 stack 패 키 징 류 로 구 조 를 구성 할 때 lock(),구 조 를 분석 할 때 unlock(),c+RAII 의 아이디어 입 니 다.
  • condition_variable cv; 조건 변수,유 니 크 협조 가 필요 합 니 다.lock 사용;unique_lock 비교 lockguard 의 장점 은 언제든지 unlock()과 lock()을 사용 할 수 있다 는 것 이다.cv.wait()이전에 mutex 를 소지 해 야 합 니 다.wait 자 체 는 unlock()mutex 를 가지 고 있 으 며,조건 이 만족 하면 mutex 를 다시 가지 고 있 습 니 다.
  • 마지막 스 레 드 풀 을 분석 할 때 join()은 임 무 를 모두 수행 하고 끝 날 때 까지 기다 릴 수 있 습 니 다.안전 합 니 다!
  • Git
    코드 를 git 에 저장 하면 최신 코드 를 얻 을 수 있 습 니 다:https://github.com/lzpong/threadpool
    [copy right from url: http://blog.csdn.net/zdarks/article/details/46994607, https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h]

    좋은 웹페이지 즐겨찾기