간단한 스레드 탱크의 실현[1]

스레드 탱크가 뭔지 소개 안 할게요.
이 예는 간단한 스레드 탱크를 실현했다.이 스레드 탱크는 아직도 개선할 수 있는 부분이 많다. 예를 들어 스레드 탱크의 크기는 고정된 것이고 스레드 탱크의 크기는 스레드 작업 상황에 따라 조정할 수 있다.
본 예는 주로 두 가지 종류를 포함하는데 하나는 임무의 기류이고 모든 임무는 여기서 계승된다.하나는 스레드 탱크 관리 클래스로 스레드 탱크를 관리하는 데 쓰인다.이제 코드를 붙이고 마지막에 코드에 대해 설명을 하겠습니다.
thread_pool.h 파일
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_


#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <vector>
#include <set>
#include <iostream>

namespace newbee {

class CWorker {
  public:
    CWorker();
    ~CWorker();
  private:
    CWorker(CWorker const&);
    CWorker& operator = (CWorker const&);
  public:
    void SetArgData(void* arg);
    virtual int Run() = 0;
  protected:
    void* m_pArg;

};

class CThreadPool {
  public:
    CThreadPool();
    ~CThreadPool();
  private:
    CThreadPool(CThreadPool const&);
    CThreadPool& operator = (CThreadPool const&);
  public:
    int Init();
    int AddJob(CWorker* job);
    int Destory();
  private:
    static void* Callback(void* arg);
    void* Run();
  private:
    int m_iThreadNum;
    pthread_mutex_t m_taskMutex;
    pthread_mutex_t m_threadMutex;
    pthread_cond_t m_threadCond;    
    std::vector<CWorker*> m_vecTask;
    std::set<pthread_t> m_setBusyThread; 
    std::set<pthread_t> m_setIdleThread; 
};

class CWorkerApp : public CWorker {
  public:
    int Run() {
      std::cout<<(char*)(this->m_pArg)<<std::endl;
      return 0;
    }
};

}
#endif
thread_pool.cc 파일
#include "thread_pool.h"
#include <iostream>

namespace newbee {

const unsigned int MAX_THREAD_NUM = 100;

CWorker::CWorker() {
  m_pArg = NULL;
}

CWorker::~CWorker() {
  m_pArg = NULL;
}


void CWorker::SetArgData(void* arg) {
  m_pArg = arg;
}

CThreadPool::CThreadPool() {
  m_iThreadNum = MAX_THREAD_NUM;
  m_setIdleThread.clear();
  m_vecTask.clear();
  m_setBusyThread.clear();
  pthread_mutex_init(&m_taskMutex, NULL);
  pthread_mutex_init(&m_threadMutex, NULL);
  pthread_cond_init(&m_threadCond, NULL);
}

CThreadPool::~CThreadPool() {
  m_setIdleThread.clear();
  m_vecTask.clear();
  m_setBusyThread.clear();
  pthread_mutex_destroy(&m_threadMutex);
  pthread_mutex_destroy(&m_taskMutex);
  pthread_cond_destroy(&m_threadCond);
}

int CThreadPool::Init() {
  pthread_t thread_id = 0;
  for(int i = 0; i < m_iThreadNum; i++) {
    pthread_create(&thread_id, NULL, Callback, this);
    m_setIdleThread.insert(thread_id);
  }
  return 0;
}

int CThreadPool::AddJob(CWorker* job) {
  pthread_mutex_lock(&m_taskMutex);  
  m_vecTask.push_back(job);
  pthread_cond_signal(&m_threadCond);
  pthread_mutex_unlock(&m_taskMutex);
}

void* CThreadPool::Callback(void* arg) {
  ((CThreadPool*)arg)->Run();
  return (void*)0;
}

void* CThreadPool::Run() {
  pthread_t pthread_id = pthread_self();
  while(1) {
    pthread_mutex_lock(&m_threadMutex);
    pthread_cond_wait(&m_threadCond, &m_threadMutex);
    pthread_mutex_lock(&m_taskMutex);
    std::vector<CWorker*>::iterator iter = m_vecTask.begin();
    if(iter != m_vecTask.end()) {
      m_setBusyThread.insert(pthread_id);
      m_setIdleThread.erase(pthread_id);
      CWorker* job = *iter;
      m_vecTask.erase(iter);
      pthread_mutex_unlock(&m_taskMutex);
      pthread_mutex_unlock(&m_threadMutex);
      job->Run();
    } else {
      pthread_mutex_unlock(&m_taskMutex);
      pthread_mutex_unlock(&m_threadMutex);
    }
  }
  return (void*)0;
}

int CThreadPool::Destory() {
  std::set<pthread_t>::iterator iter = m_setIdleThread.begin();
  while(iter != m_setIdleThread.end()) {
    pthread_cancel(*iter);
    pthread_join(*iter, NULL);
    iter++;
  }
  
  iter = m_setBusyThread.begin();
  while(iter != m_setBusyThread.end()) {
    pthread_cancel(*iter);
    pthread_join(*iter, NULL);
    iter++;
  }

  return 0;
}

}
main.cc
#include "thread_pool.h"
#include <iostream>
#include <unistd.h>

using namespace std;
using namespace newbee;

int main(int argc, char* argv[]) {
  CWorker* job = new CWorkerApp;
  CThreadPool* pool = new CThreadPool;
  pool->Init();
  char str[] = "hello world";  
  job->SetArgData((void*)str);
  for(int i = 0; i < 5; i++) {
    pool->AddJob(job); 
  }
  sleep(10);
  pool->Destory();

  return 0;
  
}

스레드 탱크가 왜 단독으로static 유형의 함수를 만들었는지 물어볼 수 있습니다. 주요 원인은 pthreadcreate 이 함수의 리셋 함수 바늘 파라미터는 클래스의 구성원 함수가 될 수 없습니다. 클래스에는this 바늘이 기본적으로 포함되어 있습니다
또 다른 pthreadcond_wait가 막힌 후, 매번addjob일 때마다 깨웁니다. 만약addJob일 때 모든 라인이 작동하고 있다면, 어떻게 해야 합니까? 깨움이 효과가 없으면 막힙니다.

좋은 웹페이지 즐겨찾기