Reactor 모드 예제

81222 단어 서버 개발 기반
프로그램의 기능은 간단한 echo 서비스이다. 클라이언트가 서버에 연결된 후에 서버에 정보를 보내고 서버에 시간 스탬프 등 정보를 추가한 후에 클라이언트에게 되돌려준다.
/**   
 *@desc: reactor         ,main.cpp
 *@author: zhangyl
 *@date:   2016.11.23
  */  
 #include 
 #include 
 #include 
 #include 
 #include 
 #include     //for htonl() and htons()
 #include 
 #include 
 #include 
 #include     //for signal()
 #include 
 #include 
 #include 
 #include 
 #include 
 #include 
 #include      //for std::setw()/setfill()
 #include   

#define WORKER_THREAD_NUM   5  
#define min(a, b) ((a <= b) ? (a) : (b))   
int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };  
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;  
pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;  
pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;  
pthread_mutex_t g_clientmutex;  
std::list<int> g_listClients;  
void prog_exit(int signo)
{  
  ::signal(SIGINT, SIG_IGN);  
  ::signal(SIGKILL, SIG_IGN);  
  ::signal(SIGTERM, SIG_IGN);  

  std::cout << "program recv signal " << signo
            << " to exit." << std::endl;  

  g_bStop = true;  

  ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);  

  //TODO:        shutdown()  ?  
  ::shutdown(g_listenfd, SHUT_RDWR);  
  ::close(g_listenfd);  
  ::close(g_epollfd);  

  ::pthread_cond_destroy(&g_acceptcond);  
  ::pthread_mutex_destroy(&g_acceptmutex);  

  ::pthread_cond_destroy(&g_cond);  
  ::pthread_mutex_destroy(&g_mutex);  

  ::pthread_mutex_destroy(&g_clientmutex);
}  
bool create_server_listener(const char* ip, short port)
{  //socket()      socket   
  g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  if (g_listenfd == -1)  
      return false;  

  int on = 1;  
  ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR,
               (char *)&on, sizeof(on));  
  ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT,
               (char *)&on, sizeof(on));  

    //     
  struct sockaddr_in servaddr;  
  memset(&servaddr, 0, sizeof(servaddr));   
  servaddr.sin_family = AF_INET;  
  servaddr.sin_addr.s_addr = inet_addr(ip);  
  servaddr.sin_port = htons(port);  
  // //bind()                socket 
  if (::bind(g_listenfd, (sockaddr *)&servaddr,sizeof(servaddr)) == -1)
      return false;  

//  listen()     socket,         connect()      ,             
  if (::listen(g_listenfd, 50) == -1)  
      return false;  

  g_epollfd = ::epoll_create(1);  
  if (g_epollfd == -1)  
      return false;  

  struct epoll_event e;  
  memset(&e, 0, sizeof(e)); // e        e       0       e 。 
  e.events = EPOLLIN | EPOLLRDHUP;  
  e.data.fd = g_listenfd;     
  //  g_listenfd    
  if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)  
      return false;  

  return true;
}  
void release_client(int clientfd)
{  
  if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
      std::cout << "release client socket failed as call epoll_ctl failed"
                << std::endl;  

  ::close(clientfd);
}  
void* accept_thread_func(void* arg)
{     
  while (!g_bStop)  
  {  
      ::pthread_mutex_lock(&g_acceptmutex);  //         
      ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);  //                            mutex,                                    。
      //::pthread_mutex_lock(&g_acceptmutex);  

      //std::cout << "run loop in accept_thread_func" << std::endl;  

/*TCP        socket()、bind()、listen()  ,
       socket   。TCP       socket()、
connect()    TCP            。
TCP            ,    accept()       ,
         。*/
      struct sockaddr_in clientaddr;  
      socklen_t addrlen;  
      int newfd = ::accept(g_listenfd,
                           (struct sockaddr *)&clientaddr, &addrlen);  
      ::pthread_mutex_unlock(&g_acceptmutex);  //         
      if (newfd == -1)  
          continue;  

      std::cout << "new client connected: "
                << ::inet_ntoa(clientaddr.sin_addr) << ":" //                   IP      
                << ::ntohs(clientaddr.sin_port) << std::endl;  //   16                  

      //  socket   non-blocking             
      int oldflag = ::fcntl(newfd, F_GETFL, 0);  
      int newflag = oldflag | O_NONBLOCK;  
      if (::fcntl(newfd, F_SETFL, newflag) == -1)  
      {  
          std::cout << "fcntl error, oldflag =" << oldflag
                    << ", newflag = " << newflag << std::endl;  
          continue;  
      }  

      //       fd
      struct epoll_event e;  
      memset(&e, 0, sizeof(e));  
      e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
      e.data.fd = newfd;  
      if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
      {  
          std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
      }  
  }  

  return NULL;
}  

void* worker_thread_func(void* arg)
{     
  while (!g_bStop)  
  {  
      int clientfd;  
      ::pthread_mutex_lock(&g_clientmutex);  //        
      while (g_listClients.empty())  
          ::pthread_cond_wait(&g_cond, &g_clientmutex); //            
      clientfd = g_listClients.front();  
      g_listClients.pop_front();    
      pthread_mutex_unlock(&g_clientmutex);  //         

      //gdb             ,           ,               
      std::cout << std::endl;  

      std::string strclientmsg;  
      char buff[256];  
      bool bError = false;  
      while (true)  
      {  
          memset(buff, 0, sizeof(buff));  
          int nRecv = ::recv(clientfd, buff, 256, 0);  //    
          if (nRecv == -1)  
          {  
              if (errno == EWOULDBLOCK)  
                  break;  
              else  
              {  
                  std::cout << "recv error, client disconnected, fd = "
                            << clientfd << std::endl;  
                  release_client(clientfd);  
                  bError = true;  
                  break;  
              }  

          }  
          //     socket,     。  
          else if (nRecv == 0)  
          {  
              std::cout << "peer closed, client disconnected, fd = "
                        << clientfd << std::endl;  
              release_client(clientfd); //  clientfd epoll_fd   
              bError = true;  
              break;  
          }  

          strclientmsg += buff;  //     strclientmsg
      }  

      //   ,             
      if (bError)  
          continue;  

      std::cout << "client msg: " << strclientmsg;  

      //              
      time_t now = time(NULL);  
      struct tm* nowstr = localtime(&now);  
      std::ostringstream ostimestr;  
      ostimestr << "[" << nowstr->tm_year + 1900 << "-"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_mon + 1 << "-"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_mday << " "  
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_hour << ":"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_min << ":"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_sec << "]server reply: ";  

      strclientmsg.insert(0, ostimestr.str());  

      while (true)  
      {  
          int nSent = ::send(clientfd, strclientmsg.c_str(), //    
                             strclientmsg.length(), 0);  
          if (nSent == -1)  
          {  
              if (errno == EWOULDBLOCK)  
              {  
                  ::sleep(10);  
                  continue;  
              }  
              else  
              {  
                  std::cout << "send error, fd = "
                            << clientfd << std::endl;  
                  release_client(clientfd);  
                  break;  
              }  

          }            

          std::cout << "send: " << strclientmsg;  
          strclientmsg.erase(0, nSent);  //   0   nSent     

          if (strclientmsg.empty())  
              break;  
      }  
  }  

  return NULL;
}  
void daemon_run()
{  
  int pid;  
  signal(SIGCHLD, SIG_IGN);  //    ,  SIGCHLD  ,     ,           ,           
  //1)     ,fork           ID;  
  //2)     ,fork  0;  
  //3)      ,fork      ;  
  pid = fork();  
  if (pid < 0)  
  {  
      std:: cout << "fork error" << std::endl;  
      exit(-1);  
  }  
  //     ,         
  else if (pid > 0) {  
      exit(0);  
  }  
  //  parent child      session ,parent   (session)     ,  
  //parent           ,  exit      ,            ,  init  。  
  //  setsid()  ,child           (session)id。  
  //  parent    ,      child 。  
  setsid();  
  int fd;  
  fd = open("/dev/null", O_RDWR, 0);  //  /dev/null           stdin/stdout/stderr    
  if (fd != -1)  
  {                                //dup2()                    
      dup2(fd, STDIN_FILENO);  //STDIN_FILENO              ,    0,           。
      dup2(fd, STDOUT_FILENO);  ////       fd     
      dup2(fd, STDERR_FILENO);  
  }  
  if (fd > 2)  
      close(fd);  
 }  

int main(int argc, char* argv[])
{    
  short port = 0;  
  int ch;  
  bool bdaemon = false;  
  /*
    while((ch = getopt(argc,argv,"a:bcde"))!= -1)
  {
  switch(ch)
  {
  case : printf("xxxtest");
  case 'a': printf("option a:’%s’
",optarg); break;   case 'b': printf("option b :b
"); break;   default: printf("other option :%c
",ch);   }   printf("optopt +%c
",optopt);   }   return 0;   } $./getopt –b option b:b $./getopt –c other option:c $./getopt –a other option :? $./getopt –a12345 option a:’12345’ */
while ((ch = getopt(argc, argv, "p:d")) != -1) // 。 argc argv { switch (ch) { case 'd': bdaemon = true; break; case 'p': port = atol(optarg); // break; } } if (bdaemon) daemon_run(); if (port == 0) port = 12345; if (!create_server_listener("0.0.0.0", port)) // { std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl; return -1; } // signal(SIGCHLD, SIG_DFL); signal(SIGPIPE, SIG_IGN); signal(SIGINT, prog_exit); signal(SIGKILL, prog_exit); signal(SIGTERM, prog_exit); // ::pthread_cond_init(&g_acceptcond, NULL); // ::pthread_mutex_init(&g_acceptmutex, NULL); // mutex ::pthread_cond_init(&g_cond, NULL); ::pthread_mutex_init(&g_mutex, NULL); ::pthread_mutex_init(&g_clientmutex, NULL); // accept_thread_func , accept_thread_func , fd ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL); // worker_thread_func, for (int i = 0; i < WORKER_THREAD_NUM; ++i) { ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL); } while (!g_bStop) { struct epoll_event ev[1024]; // socket int n = ::epoll_wait(g_epollfd, ev, 1024, 10); if (n == 0) continue; else if (n < 0) { std::cout << "epoll_wait error" << std::endl; continue; } int m = min(n, 1024); for (int i = 0; i < m; ++i) { // if (ev[i].data.fd == g_listenfd) pthread_cond_signal(&g_acceptcond); // else { pthread_mutex_lock(&g_clientmutex); g_listClients.push_back(ev[i].data.fd); pthread_mutex_unlock(&g_clientmutex); pthread_cond_signal(&g_cond); //std::cout << "signal" << std::endl; } } } return 0; }

프로그램의 대략적인 프레임워크는 다음과 같습니다.
주 루틴은 socket에 새로운 연결이 있는지 감청하는 것만 담당합니다. 새로운 연결이 오면 accept라는 작업 루틴에 새 연결을 수신하고, 새 연결을 주 루틴에 연결하려면 epollfd를 사용하십시오.
주 스레드가 클라이언트의 socket에서 읽을 수 있는 이벤트를 탐지하면 다른 다섯 개의 작업 스레드에 클라이언트가 보낸 데이터를 수신하고 시간 스탬프를 추가해서 클라이언트에게 돌려보냅니다.
-pport 전달을 통해 프로그램의 감청 포트 번호를 설정할 수 있습니다.프로그램이 데몬 모드로 백엔드에서 실행되도록 - d를 전달할 수 있습니다.이것도 표준 linux 데몬 모드의 쓰기 방법이다.
프로그램의 난점과 주의해야 할 점은 다음과 같다.
1. 조건 변수는 거짓 깨우기를 방지하기 위해 반드시 하나의 순환에서 pthread 를 호출해야 한다.cond_wait () 함수,workerthread_func()에서 while(g listClients.empty(): pthreadcond_wait(&g_cond, &g_clientmutex); cept_thread_func () 함수에 순환을 사용하지 않았습니다. 문제가 있습니까?
2、사용 조건 변수 pthreadcond_wait () 함수는 반드시 이 조건 변수와 관련된 mutex, 즉 다음과 같은 구조를 먼저 얻어야 한다.
1  mutex_lock(...);  
2  while (condition is true)  
3    ::pthread_cond_wait(...);  
4  //         ...  mutex_unlock(...);  
5  //         ...

왜냐하면 pthreadcond_wait () 가 막히면, 잠금 해제와 현재 라인을 막는 두 동작을 합치면 원자입니다.
3. 서버 사이드 프로그램으로 socket 탐지 호출 setsocketopt () SOREUSEADDR 및 SOREUSEPORT 두 개의 로고는 서비스 프로그램이 때때로 재부팅이 필요할 때가 있기 때문에 (예를 들어 디버깅을 할 때 계속 재부팅하기) 이 두 개의 로고를 설정하지 않으면 포트를 연결할 때 호출이 실패합니다.한 포트가 사용되면 더 이상 사용하지 않아도 네 번 손을 흔들면 TIMEWAIT 상태는 약 2min의 MSL(Maximum Segment Lifetime, 최대 생존 기간)이 있습니다.이 2min 내에 이 포트는 중복 사용될 수 없습니다.서버 프로그램이 지난번에 이 포트 번호를 사용하고 다시 시작합니다. 이 때문에 이 포트를 다시 연결하면 실패합니다. (bind 함수 호출 실패)아니면 리셋할 때마다 2min을 기다린 후에 다시 시도해야 한다. (이것은 자주 리셋할 때 디버깅을 받기 어렵다.) 또는 이런 SO 를 설정해야 한다.REUSEADDR 및 SOREUSEPORT는 지금 포트를 재활용합니다.사실, SOREUSEADDR은 윈도우즈와 Unix 플랫폼에서 미세한 차이가 있습니다.libevent 원본에서 다음과 같은 설명을 보았습니다.
 1int evutil_make_listen_socket_reuseable(evutil_socket_t sock)  
 2{  
 3 #ifndef WIN32  
 4    int one = 1;  
 5    /* REUSEADDR on Unix means, "don't hang on to this address after the 
 6     * listener is closed."  On Windows, though, it means "don't keep other 
 7     * processes from binding to this address while we're using it.
 8     */  
 9    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,  
10        (ev_socklen_t)sizeof(one));  
11#else  
12    return 0;  
13#endif  
14}  

설명 부분에 주의하십시오. Unix 플랫폼에서 이 옵션을 설정하면 임의의 프로세스가 이 주소를 다시 사용할 수 있음을 의미합니다.윈도우즈에서는 다른 프로세스가 이 주소를 다시 사용하는 것을 막지 마십시오.즉, Unix 플랫폼에서 이 옵션을 설정하지 않으면 임의의 프로세스가 일정 시간 내에 이 주소를 bind할 수 없다.윈도우즈 플랫폼에서 일정 시간 동안 다른 프로세스는 이 주소를 bind할 수 없지만, 이 프로세스는 다시 이 주소를 bind할 수 있다.
4、epoll_wait는 새 연결 socket에 대해 기본 수평 트리거 모드(level trigger)가 아닌 가장자리 트리거 모드인 EPOLLET(edge trigger)를 사용합니다.수평 트리거 모드를 사용하면 주 루틴이 클라이언트의 socket 데이터를 읽을 수 있음을 감지할 때, 작업 루틴에 이 socket의 데이터를 수취하라고 알립니다. 이 때 주 루틴은 계속 순환합니다. 작업 루틴에서 이 socket의 데이터를 모두 수거하지 않거나, 작업 루틴에서 데이터를 수취하는 과정에서 클라이언트가 새로운 데이터를 가져오면 주 루틴은 계속 알림 (pthread cond signal) 함수를 통해작업 라인에 데이터를 받는 것을 다시 통지합니다.이렇게 하면 여러 개의 작업 라인이 동시에 Recv 함수를 호출하여 이 클라이언트 socket의 데이터를 받을 수 있으며, 이로 인해 발생하는 결과는 데이터 착란을 초래할 수 있다.반대로 테두리 트리거 모드를 사용하면 어떤 작업 라인만 그 클라이언트 socket의 데이터를 모두 수거할 수 있습니다. 메인 라인의 epollwait는 작업 라인이 그 클라이언트 socket에서 새로 온 데이터를 계속 받는다는 것을 알리기 위해 다시 터치할 수 있습니다.
5. 코드에 다음과 같은 줄이 있습니다://gdb 디버깅 시 표준 출력을 실시간으로 리셋할 수 없습니다. 이 함수로 표준 출력을 리셋하면 정보가 화면에 std:::cout<프로그램은 내가 배치했다. 너는 linux의nc명령이나 자신이 프로그램을 써서 서버에 연결해서 프로그램 효과를 볼 수 있다. 물론 텔넷 명령도 사용할 수 있다. 방법:
linux:
nc 120.55.94.78 12345
또는
telnet 120.55.94.78 12345
그리고 서버에 데이터를 자유롭게 보낼 수 있습니다. 서버는 당신에게 보낸 정보에 시간 스탬프를 붙여서 당신에게 되돌려줍니다.효과는 다음과 같습니다.
또한 이 코드를 순수한 C++11 버전으로 고쳤습니다. CMake 컴파일을 사용합니다. 컴파일을 지원하기 위해서는 이 -std=c++ 11을 추가해야 합니다.
CMakeLists.txt 코드는 다음과 같습니다.
 1cmake_minimum_required(VERSION 2.8)  
 2  PROJECT(myreactorserver)  
 3  AUX_SOURCE_DIRECTORY(./ SRC_LIST)
 4  SET(EXECUTABLE_OUTPUT_PATH ./)  
 5  ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated
 6                  -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64
 7                  -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR 
 8                  -DAC_HAS_CRITICAL -DTIXML_USE_STL
 9                  -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS}
10                  -std=c++11)  
11  INCLUDE_DIRECTORIES(  ./  )
12  LINK_DIRECTORIES(  ./  )  
13  set(  main.cpp  myreator.cpp  )  
14  ADD_EXECUTABLE(myreactorserver ${SRC_LIST})  
15  TARGET_LINK_LIBRARIES(myreactorserver pthread)  

myreactor.h 파일 내용:
 1/**
 2 *@desc: myreactor   , myreactor.h
 3 *@author: zhangyl
 4 *@date: 2016.12.03
 5 */
 6  #ifndef __MYREACTOR_H__
 7  #define __MYREACTOR_H__  
 8  #include 
 9  #include 
10  #include 
11  #include 
12  #include   
13  #define WORKER_THREAD_NUM   5  
14  class CMyReactor
15  {
16  public:  
17    CMyReactor();  
18    ~CMyReactor();  
19
20    bool init(const char* ip, short nport);  
21    bool uninit();  
22
23    bool close_client(int clientfd);  
24
25    static void* main_loop(void* p);  
26  private:  
27    //no copyable  
28    CMyReactor(const CMyReactor& rhs);  
29    CMyReactor& operator = (const CMyReactor& rhs);  
30
31    bool create_server_listener(const char* ip, short port);  
32
33    static void accept_thread_proc(CMyReactor* pReatcor);  
34    static void worker_thread_proc(CMyReactor* pReatcor);  
35  private:  
36    //C11            
37    int                          m_listenfd = 0;  
38    int                          m_epollfd  = 0;  
39    bool                         m_bStop    = false;  
40
41    std::shared_ptr<std::thread> m_acceptthread;  
42    std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];  
43
44    std::condition_variable      m_acceptcond;  
45    std::mutex                   m_acceptmutex;  
46
47    std::condition_variable      m_workercond ;  
48    std::mutex                   m_workermutex;  
49
50    std::list<int>                 m_listClients;
51  };  
52  #endif //!__MYREACTOR_H__ 

myreactor.cpp 파일 내용:
  1 /**
  2  *@desc: myreactor    , myreactor.cpp
  3  *@author: zhangyl
  4  *@date: 2016.12.03
  5  */  #include "myreactor.h"
  6  #include 
  7  #include 
  8  #include 
  9  #include 
 10  #include 
 11  #include   //for htonl() and htons()
 12  #include 
 13  #include 
 14  #include 
 15  #include 
 16  #include 
 17  #include 
 18  #include    //for std::setw()/setfill()
 19  #include   
 20  #define min(a, b) ((a <= b) ? (a) : (b))  
 21  CMyReactor::CMyReactor()
 22  {  
 23    //m_listenfd = 0;  
 24    //m_epollfd = 0;  
 25    //m_bStop = false;
 26  }  
 27  CMyReactor::~CMyReactor()
 28  {  
 29  }  
 30  bool CMyReactor::init(const char* ip, short nport)
 31  {  
 32    if (!create_server_listener(ip, nport))  
 33    {  
 34        std::cout << "Unable to bind: " << ip
 35                  << ":" << nport << "." << std::endl;  
 36        return false;  
 37    }  
 38
 39
 40    std::cout << "main thread id = " << std::this_thread::get_id()
 41              << std::endl;  
 42
 43    //            
 44    m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));  
 45
 46    //        
 47    for (auto& t : m_workerthreads)  
 48    {  
 49        t.reset(new std::thread(CMyReactor::worker_thread_proc, this));  
 50    }  
 51
 52
 53    return true;
 54  }  
 55  bool CMyReactor::uninit()
 56  {  
 57    m_bStop = true;  
 58    m_acceptcond.notify_one();  
 59    m_workercond.notify_all();  
 60
 61    m_acceptthread->join();  
 62    for (auto& t : m_workerthreads)  
 63    {  
 64        t->join();  
 65    }  
 66
 67    ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);  
 68
 69    //TODO:        shutdown()  ?  
 70    ::shutdown(m_listenfd, SHUT_RDWR);  
 71    ::close(m_listenfd);  
 72    ::close(m_epollfd);  
 73
 74    return true;
 75  }  
 76  bool CMyReactor::close_client(int clientfd)
 77  {  
 78    if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
 79    {  
 80        std::cout << "close client socket failed as call epoll_ctl failed"
 81                  << std::endl;  
 82        //return false;  
 83    }  
 84
 85
 86    ::close(clientfd);  
 87
 88    return true;
 89  }  
 90
 91  void* CMyReactor::main_loop(void* p)
 92  {  
 93    std::cout << "main thread id = "
 94              << std::this_thread::get_id() << std::endl;  
 95
 96    CMyReactor* pReatcor = static_cast(p);  
 97
 98    while (!pReatcor->m_bStop)  
 99    {  
100        struct epoll_event ev[1024];  
101        int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);  
102        if (n == 0)  
103            continue;  
104        else if (n < 0)  
105        {  
106            std::cout << "epoll_wait error" << std::endl;  
107            continue;  
108        }  
109
110        int m = min(n, 1024);  
111        for (int i = 0; i < m; ++i)  
112        {  
113            //               
114            if (ev[i].data.fd == pReatcor->m_listenfd)  
115                pReatcor->m_acceptcond.notify_one();  
116            //              
117            else  
118            {  
119                {  
120                    std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
121                    pReatcor->m_listClients.push_back(ev[i].data.fd);  
122                }  
123
124                pReatcor->m_workercond.notify_one();  
125                //std::cout << "signal" << std::endl;  
126            }// end if  
127
128        }// end for-loop  
129    }// end while  
130
131    std::cout << "main loop exit ..." << std::endl;  
132
133    return NULL;
134  }  
135  void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
136  {  
137    std::cout << "accept thread, thread id = "
138              << std::this_thread::get_id() << std::endl;  
139
140    while (true)  
141    {  
142        int newfd;  
143        struct sockaddr_in clientaddr;  
144        socklen_t addrlen;  
145        {  
146            std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);  
147            pReatcor->m_acceptcond.wait(guard);  
148            if (pReatcor->m_bStop)  
149                break;  
150
151            //std::cout << "run loop in accept_thread_proc" << std::endl;  
152
153            newfd = ::accept(pReatcor->m_listenfd,
154                              (struct sockaddr *)&clientaddr, &addrlen);  
155        }  
156        if (newfd == -1)  
157            continue;  
158
159        std::cout << "new client connected: "
160                  << ::inet_ntoa(clientaddr.sin_addr) << ":"      
161                  << ::ntohs(clientaddr.sin_port) << std::endl;  
162
163        //  socket   non-blocking  
164        int oldflag = ::fcntl(newfd, F_GETFL, 0);  
165        int newflag = oldflag | O_NONBLOCK;  
166        if (::fcntl(newfd, F_SETFL, newflag) == -1)  
167        {  
168            std::cout << "fcntl error, oldflag =" << oldflag
169                      << ", newflag = " << newflag << std::endl;  
170            continue;  
171        }  
172
173        struct epoll_event e;  
174        memset(&e, 0, sizeof(e));  
175        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
176        e.data.fd = newfd;  
177        if (::epoll_ctl(pReatcor->m_epollfd, 
178            EPOLL_CTL_ADD, newfd, &e) == -1)  
179        {  
180            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
181        }  
182    }  
183
184    std::cout << "accept thread exit ..." << std::endl;
185  }  
186  void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
187  {  
188    std::cout << "new worker thread, thread id = "
189              << std::this_thread::get_id() << std::endl;  
190
191    while (true)  
192    {  
193        int clientfd;  
194        {  
195            std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
196            while (pReatcor->m_listClients.empty())  
197            {  
198                if (pReatcor->m_bStop)  
199                {  
200                    std::cout << "worker thread exit ..." << std::endl;  
201                    return;  
202                }  
203
204                pReatcor->m_workercond.wait(guard);  
205            }  
206
207            clientfd = pReatcor->m_listClients.front();  
208            pReatcor->m_listClients.pop_front();  
209        }  
210
211        //gdb             ,           ,               
212        std::cout << std::endl;  
213
214        std::string strclientmsg;  
215        char buff[256];  
216        bool bError = false;  
217        while (true)  
218        {  
219            memset(buff, 0, sizeof(buff));  
220            int nRecv = ::recv(clientfd, buff, 256, 0);  
221            if (nRecv == -1)  
222            {  
223                if (errno == EWOULDBLOCK)  
224                    break;  
225                else  
226                {  
227                    std::cout << "recv error, client disconnected, fd = "
228                              << clientfd << std::endl;  
229                    pReatcor->close_client(clientfd);  
230                    bError = true;  
231                    break;  
232                }  
233
234            }  
235            //     socket,     。  
236            else if (nRecv == 0)  
237            {  
238                std::cout << "peer closed, client disconnected, fd = "
239                          << clientfd << std::endl;  
240                pReatcor->close_client(clientfd);  
241                bError = true;  
242                break;  
243            }  
244
245            strclientmsg += buff;  
246        }  
247
248        //   ,             
249        if (bError)  
250            continue;  
251
252        std::cout << "client msg: " << strclientmsg;  
253
254        //              
255        time_t now = time(NULL);  
256        struct tm* nowstr = localtime(&now);  
257        std::ostringstream ostimestr;  
258        ostimestr << "[" << nowstr->tm_year + 1900 << "-"  
259            << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"  
260            << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
261            << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"  
262            << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"  
263            << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
264
265        strclientmsg.insert(0, ostimestr.str());  
266
267        while (true)  
268        {  
269            int nSent = ::send(clientfd, strclientmsg.c_str(), 
270                               strclientmsg.length(), 0);  
271            if (nSent == -1)  
272            {  
273                if (errno == EWOULDBLOCK)  
274                {  
275                    std::this_thread::sleep_for(std::chrono::milliseconds(10));  
276                    continue;  
277                }  
278                else  
279                {  
280                    std::cout << "send error, fd = "
281                              << clientfd << std::endl;  
282                    pReatcor->close_client(clientfd);  
283                    break;  
284                }  
285
286            }  
287
288            std::cout << "send: " << strclientmsg;  
289            strclientmsg.erase(0, nSent);  
290
291            if (strclientmsg.empty())  
292                break;  
293        }  
294    }
295  }  
296  bool CMyReactor::create_server_listener(const char* ip, short port)
297  {  
298    m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
299    if (m_listenfd == -1)  
300        return false;  
301
302    int on = 1;  
303    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR,
304                (char *)&on, sizeof(on));  
305    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT,
306                (char *)&on, sizeof(on));  
307
308    struct sockaddr_in servaddr;  
309    memset(&servaddr, 0, sizeof(servaddr));  
310    servaddr.sin_family = AF_INET;  
311    servaddr.sin_addr.s_addr = inet_addr(ip);  
312    servaddr.sin_port = htons(port);  
313    if (::bind(m_listenfd, (sockaddr *)&servaddr, 
314         sizeof(servaddr)) == -1)  
315        return false;  
316
317    if (::listen(m_listenfd, 50) == -1)  
318        return false;  
319
320    m_epollfd = ::epoll_create(1);  
321    if (m_epollfd == -1)  
322        return false;  
323
324    struct epoll_event e;  
325    memset(&e, 0, sizeof(e));  
326    e.events = EPOLLIN | EPOLLRDHUP;  
327    e.data.fd = m_listenfd;  
328    if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)  
329        return false;  
330
331    return true;
332  }  

main.cpp 파일 내용:
  1/**  
  2 *@desc:    reactor          
  3 *@author: zhangyl 
  4 *@date:   2016.12.03 
  5 */  
  6
  7#include   
  8#include      //for signal()  
  9#include  
 10#include        //for exit()  
 11#include   
 12#include   
 13#include   
 14#include "myreactor.h"  
 15
 16CMyReactor g_reator;  
 17
 18void prog_exit(int signo)  
 19{  
 20    std::cout << "program recv signal " << signo
 21              << " to exit." << std::endl;   
 22
 23    g_reator.uninit();  
 24}  
 25
 26void daemon_run()  
 27{  
 28    int pid;  
 29    signal(SIGCHLD, SIG_IGN);  
 30    //1)     ,fork           ID;  
 31    //2)     ,fork  0;  
 32    //3)      ,fork      ;  
 33    pid = fork();  
 34    if (pid < 0)  
 35    {  
 36        std:: cout << "fork error" << std::endl;  
 37        exit(-1);  
 38    }  
 39    //     ,         
 40    else if (pid > 0)
 41   {  
 42        exit(0);  
 43    }  
 44    //  parent child      session ,parent   (session)     ,  
 45    //parent           ,  exit      ,            ,  init  。  
 46    //  setsid()  ,child           (session)id。  
 47    //  parent    ,      child 。  
 48    setsid();  
 49    int fd;  
 50    fd = open("/dev/null", O_RDWR, 0);  
 51    if (fd != -1)  
 52    {  
 53        dup2(fd, STDIN_FILENO);  
 54        dup2(fd, STDOUT_FILENO);  
 55        dup2(fd, STDERR_FILENO);  
 56    }  
 57    if (fd > 2)  
 58        close(fd);  
 59}  
 60
 61
 62int main(int argc, char* argv[])  
 63{    
 64    //        
 65    signal(SIGCHLD, SIG_DFL);  
 66    signal(SIGPIPE, SIG_IGN);  
 67    signal(SIGINT, prog_exit);  
 68    signal(SIGKILL, prog_exit);  
 69    signal(SIGTERM, prog_exit);  
 70
 71    short port = 0;  
 72    int ch;  
 73    bool bdaemon = false;  
 74    while ((ch = getopt(argc, argv, "p:d")) != -1)  
 75    {  
 76        switch (ch)  
 77        {  
 78        case 'd':  
 79            bdaemon = true;  
 80            break;  
 81        case 'p':  
 82            port = atol(optarg);  
 83            break;  
 84        }  
 85    }  
 86
 87    if (bdaemon)  
 88        daemon_run();  
 89
 90
 91    if (port == 0)  
 92        port = 12345;  
 93
 94
 95    if (!g_reator.init("0.0.0.0", 12345))  
 96        return -1;  
 97
 98    g_reator.main_loop(&g_reator);  
 99
100    return 0;  
101}  

좋은 웹페이지 즐겨찾기