Reactor 모드 예제
81222 단어 서버 개발 기반
/**
*@desc: reactor ,main.cpp
*@author: zhangyl
*@date: 2016.11.23
*/
#include
#include
#include
#include
#include
#include //for htonl() and htons()
#include
#include
#include
#include //for signal()
#include
#include
#include
#include
#include
#include
#include //for std::setw()/setfill()
#include
#define WORKER_THREAD_NUM 5
#define min(a, b) ((a <= b) ? (a) : (b))
int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;
pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;
pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;
pthread_mutex_t g_clientmutex;
std::list<int> g_listClients;
void prog_exit(int signo)
{
::signal(SIGINT, SIG_IGN);
::signal(SIGKILL, SIG_IGN);
::signal(SIGTERM, SIG_IGN);
std::cout << "program recv signal " << signo
<< " to exit." << std::endl;
g_bStop = true;
::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);
//TODO: shutdown() ?
::shutdown(g_listenfd, SHUT_RDWR);
::close(g_listenfd);
::close(g_epollfd);
::pthread_cond_destroy(&g_acceptcond);
::pthread_mutex_destroy(&g_acceptmutex);
::pthread_cond_destroy(&g_cond);
::pthread_mutex_destroy(&g_mutex);
::pthread_mutex_destroy(&g_clientmutex);
}
bool create_server_listener(const char* ip, short port)
{ //socket() socket
g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (g_listenfd == -1)
return false;
int on = 1;
::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT,
(char *)&on, sizeof(on));
//
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr(ip);
servaddr.sin_port = htons(port);
// //bind() socket
if (::bind(g_listenfd, (sockaddr *)&servaddr,sizeof(servaddr)) == -1)
return false;
// listen() socket, connect() ,
if (::listen(g_listenfd, 50) == -1)
return false;
g_epollfd = ::epoll_create(1);
if (g_epollfd == -1)
return false;
struct epoll_event e;
memset(&e, 0, sizeof(e)); // e e 0 e 。
e.events = EPOLLIN | EPOLLRDHUP;
e.data.fd = g_listenfd;
// g_listenfd
if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)
return false;
return true;
}
void release_client(int clientfd)
{
if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
std::cout << "release client socket failed as call epoll_ctl failed"
<< std::endl;
::close(clientfd);
}
void* accept_thread_func(void* arg)
{
while (!g_bStop)
{
::pthread_mutex_lock(&g_acceptmutex); //
::pthread_cond_wait(&g_acceptcond, &g_acceptmutex); // mutex, 。
//::pthread_mutex_lock(&g_acceptmutex);
//std::cout << "run loop in accept_thread_func" << std::endl;
/*TCP socket()、bind()、listen() ,
socket 。TCP socket()、
connect() TCP 。
TCP , accept() ,
。*/
struct sockaddr_in clientaddr;
socklen_t addrlen;
int newfd = ::accept(g_listenfd,
(struct sockaddr *)&clientaddr, &addrlen);
::pthread_mutex_unlock(&g_acceptmutex); //
if (newfd == -1)
continue;
std::cout << "new client connected: "
<< ::inet_ntoa(clientaddr.sin_addr) << ":" // IP
<< ::ntohs(clientaddr.sin_port) << std::endl; // 16
// socket non-blocking
int oldflag = ::fcntl(newfd, F_GETFL, 0);
int newflag = oldflag | O_NONBLOCK;
if (::fcntl(newfd, F_SETFL, newflag) == -1)
{
std::cout << "fcntl error, oldflag =" << oldflag
<< ", newflag = " << newflag << std::endl;
continue;
}
// fd
struct epoll_event e;
memset(&e, 0, sizeof(e));
e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
e.data.fd = newfd;
if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
{
std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
}
}
return NULL;
}
void* worker_thread_func(void* arg)
{
while (!g_bStop)
{
int clientfd;
::pthread_mutex_lock(&g_clientmutex); //
while (g_listClients.empty())
::pthread_cond_wait(&g_cond, &g_clientmutex); //
clientfd = g_listClients.front();
g_listClients.pop_front();
pthread_mutex_unlock(&g_clientmutex); //
//gdb , ,
std::cout << std::endl;
std::string strclientmsg;
char buff[256];
bool bError = false;
while (true)
{
memset(buff, 0, sizeof(buff));
int nRecv = ::recv(clientfd, buff, 256, 0); //
if (nRecv == -1)
{
if (errno == EWOULDBLOCK)
break;
else
{
std::cout << "recv error, client disconnected, fd = "
<< clientfd << std::endl;
release_client(clientfd);
bError = true;
break;
}
}
// socket, 。
else if (nRecv == 0)
{
std::cout << "peer closed, client disconnected, fd = "
<< clientfd << std::endl;
release_client(clientfd); // clientfd epoll_fd
bError = true;
break;
}
strclientmsg += buff; // strclientmsg
}
// ,
if (bError)
continue;
std::cout << "client msg: " << strclientmsg;
//
time_t now = time(NULL);
struct tm* nowstr = localtime(&now);
std::ostringstream ostimestr;
ostimestr << "[" << nowstr->tm_year + 1900 << "-"
<< std::setw(2) << std::setfill('0')
<< nowstr->tm_mon + 1 << "-"
<< std::setw(2) << std::setfill('0')
<< nowstr->tm_mday << " "
<< std::setw(2) << std::setfill('0')
<< nowstr->tm_hour << ":"
<< std::setw(2) << std::setfill('0')
<< nowstr->tm_min << ":"
<< std::setw(2) << std::setfill('0')
<< nowstr->tm_sec << "]server reply: ";
strclientmsg.insert(0, ostimestr.str());
while (true)
{
int nSent = ::send(clientfd, strclientmsg.c_str(), //
strclientmsg.length(), 0);
if (nSent == -1)
{
if (errno == EWOULDBLOCK)
{
::sleep(10);
continue;
}
else
{
std::cout << "send error, fd = "
<< clientfd << std::endl;
release_client(clientfd);
break;
}
}
std::cout << "send: " << strclientmsg;
strclientmsg.erase(0, nSent); // 0 nSent
if (strclientmsg.empty())
break;
}
}
return NULL;
}
void daemon_run()
{
int pid;
signal(SIGCHLD, SIG_IGN); // , SIGCHLD , , ,
//1) ,fork ID;
//2) ,fork 0;
//3) ,fork ;
pid = fork();
if (pid < 0)
{
std:: cout << "fork error" << std::endl;
exit(-1);
}
// ,
else if (pid > 0) {
exit(0);
}
// parent child session ,parent (session) ,
//parent , exit , , init 。
// setsid() ,child (session)id。
// parent , child 。
setsid();
int fd;
fd = open("/dev/null", O_RDWR, 0); // /dev/null stdin/stdout/stderr
if (fd != -1)
{ //dup2()
dup2(fd, STDIN_FILENO); //STDIN_FILENO , 0, 。
dup2(fd, STDOUT_FILENO); //// fd
dup2(fd, STDERR_FILENO);
}
if (fd > 2)
close(fd);
}
int main(int argc, char* argv[])
{
short port = 0;
int ch;
bool bdaemon = false;
/*
while((ch = getopt(argc,argv,"a:bcde"))!= -1)
{
switch(ch)
{
case : printf("xxxtest");
case 'a': printf("option a:’%s’
",optarg); break;
case 'b': printf("option b :b
"); break;
default: printf("other option :%c
",ch);
}
printf("optopt +%c
",optopt);
}
return 0;
}
$./getopt –b
option b:b
$./getopt –c
other option:c
$./getopt –a
other option :?
$./getopt –a12345
option a:’12345’
*/
while ((ch = getopt(argc, argv, "p:d")) != -1) // 。 argc argv
{
switch (ch)
{
case 'd':
bdaemon = true;
break;
case 'p':
port = atol(optarg); //
break;
}
}
if (bdaemon)
daemon_run();
if (port == 0)
port = 12345;
if (!create_server_listener("0.0.0.0", port)) //
{
std::cout << "Unable to create listen server: ip=0.0.0.0, port="
<< port << "." << std::endl;
return -1;
}
//
signal(SIGCHLD, SIG_DFL);
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, prog_exit);
signal(SIGKILL, prog_exit);
signal(SIGTERM, prog_exit);
//
::pthread_cond_init(&g_acceptcond, NULL); //
::pthread_mutex_init(&g_acceptmutex, NULL); // mutex
::pthread_cond_init(&g_cond, NULL);
::pthread_mutex_init(&g_mutex, NULL);
::pthread_mutex_init(&g_clientmutex, NULL);
// accept_thread_func , accept_thread_func , fd
::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);
// worker_thread_func,
for (int i = 0; i < WORKER_THREAD_NUM; ++i)
{
::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);
}
while (!g_bStop)
{
struct epoll_event ev[1024]; // socket
int n = ::epoll_wait(g_epollfd, ev, 1024, 10);
if (n == 0)
continue;
else if (n < 0)
{
std::cout << "epoll_wait error" << std::endl;
continue;
}
int m = min(n, 1024);
for (int i = 0; i < m; ++i)
{
//
if (ev[i].data.fd == g_listenfd)
pthread_cond_signal(&g_acceptcond);
//
else
{
pthread_mutex_lock(&g_clientmutex);
g_listClients.push_back(ev[i].data.fd);
pthread_mutex_unlock(&g_clientmutex);
pthread_cond_signal(&g_cond);
//std::cout << "signal" << std::endl;
}
}
}
return 0;
}
프로그램의 대략적인 프레임워크는 다음과 같습니다.
주 루틴은 socket에 새로운 연결이 있는지 감청하는 것만 담당합니다. 새로운 연결이 오면 accept라는 작업 루틴에 새 연결을 수신하고, 새 연결을 주 루틴에 연결하려면 epollfd를 사용하십시오.
주 스레드가 클라이언트의 socket에서 읽을 수 있는 이벤트를 탐지하면 다른 다섯 개의 작업 스레드에 클라이언트가 보낸 데이터를 수신하고 시간 스탬프를 추가해서 클라이언트에게 돌려보냅니다.
-pport 전달을 통해 프로그램의 감청 포트 번호를 설정할 수 있습니다.프로그램이 데몬 모드로 백엔드에서 실행되도록 - d를 전달할 수 있습니다.이것도 표준 linux 데몬 모드의 쓰기 방법이다.
프로그램의 난점과 주의해야 할 점은 다음과 같다.
1. 조건 변수는 거짓 깨우기를 방지하기 위해 반드시 하나의 순환에서 pthread 를 호출해야 한다.cond_wait () 함수,workerthread_func()에서 while(g listClients.empty(): pthreadcond_wait(&g_cond, &g_clientmutex); cept_thread_func () 함수에 순환을 사용하지 않았습니다. 문제가 있습니까?
2、사용 조건 변수 pthreadcond_wait () 함수는 반드시 이 조건 변수와 관련된 mutex, 즉 다음과 같은 구조를 먼저 얻어야 한다.
1 mutex_lock(...);
2 while (condition is true)
3 ::pthread_cond_wait(...);
4 // ... mutex_unlock(...);
5 // ...
왜냐하면 pthreadcond_wait () 가 막히면, 잠금 해제와 현재 라인을 막는 두 동작을 합치면 원자입니다.
3. 서버 사이드 프로그램으로 socket 탐지 호출 setsocketopt () SOREUSEADDR 및 SOREUSEPORT 두 개의 로고는 서비스 프로그램이 때때로 재부팅이 필요할 때가 있기 때문에 (예를 들어 디버깅을 할 때 계속 재부팅하기) 이 두 개의 로고를 설정하지 않으면 포트를 연결할 때 호출이 실패합니다.한 포트가 사용되면 더 이상 사용하지 않아도 네 번 손을 흔들면 TIMEWAIT 상태는 약 2min의 MSL(Maximum Segment Lifetime, 최대 생존 기간)이 있습니다.이 2min 내에 이 포트는 중복 사용될 수 없습니다.서버 프로그램이 지난번에 이 포트 번호를 사용하고 다시 시작합니다. 이 때문에 이 포트를 다시 연결하면 실패합니다. (bind 함수 호출 실패)아니면 리셋할 때마다 2min을 기다린 후에 다시 시도해야 한다. (이것은 자주 리셋할 때 디버깅을 받기 어렵다.) 또는 이런 SO 를 설정해야 한다.REUSEADDR 및 SOREUSEPORT는 지금 포트를 재활용합니다.사실, SOREUSEADDR은 윈도우즈와 Unix 플랫폼에서 미세한 차이가 있습니다.libevent 원본에서 다음과 같은 설명을 보았습니다.
1int evutil_make_listen_socket_reuseable(evutil_socket_t sock)
2{
3 #ifndef WIN32
4 int one = 1;
5 /* REUSEADDR on Unix means, "don't hang on to this address after the
6 * listener is closed." On Windows, though, it means "don't keep other
7 * processes from binding to this address while we're using it.
8 */
9 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,
10 (ev_socklen_t)sizeof(one));
11#else
12 return 0;
13#endif
14}
설명 부분에 주의하십시오. Unix 플랫폼에서 이 옵션을 설정하면 임의의 프로세스가 이 주소를 다시 사용할 수 있음을 의미합니다.윈도우즈에서는 다른 프로세스가 이 주소를 다시 사용하는 것을 막지 마십시오.즉, Unix 플랫폼에서 이 옵션을 설정하지 않으면 임의의 프로세스가 일정 시간 내에 이 주소를 bind할 수 없다.윈도우즈 플랫폼에서 일정 시간 동안 다른 프로세스는 이 주소를 bind할 수 없지만, 이 프로세스는 다시 이 주소를 bind할 수 있다.
4、epoll_wait는 새 연결 socket에 대해 기본 수평 트리거 모드(level trigger)가 아닌 가장자리 트리거 모드인 EPOLLET(edge trigger)를 사용합니다.수평 트리거 모드를 사용하면 주 루틴이 클라이언트의 socket 데이터를 읽을 수 있음을 감지할 때, 작업 루틴에 이 socket의 데이터를 수취하라고 알립니다. 이 때 주 루틴은 계속 순환합니다. 작업 루틴에서 이 socket의 데이터를 모두 수거하지 않거나, 작업 루틴에서 데이터를 수취하는 과정에서 클라이언트가 새로운 데이터를 가져오면 주 루틴은 계속 알림 (pthread cond signal) 함수를 통해작업 라인에 데이터를 받는 것을 다시 통지합니다.이렇게 하면 여러 개의 작업 라인이 동시에 Recv 함수를 호출하여 이 클라이언트 socket의 데이터를 받을 수 있으며, 이로 인해 발생하는 결과는 데이터 착란을 초래할 수 있다.반대로 테두리 트리거 모드를 사용하면 어떤 작업 라인만 그 클라이언트 socket의 데이터를 모두 수거할 수 있습니다. 메인 라인의 epollwait는 작업 라인이 그 클라이언트 socket에서 새로 온 데이터를 계속 받는다는 것을 알리기 위해 다시 터치할 수 있습니다.
5. 코드에 다음과 같은 줄이 있습니다://gdb 디버깅 시 표준 출력을 실시간으로 리셋할 수 없습니다. 이 함수로 표준 출력을 리셋하면 정보가 화면에 std:::cout<
linux:
nc 120.55.94.78 12345
또는
telnet 120.55.94.78 12345
그리고 서버에 데이터를 자유롭게 보낼 수 있습니다. 서버는 당신에게 보낸 정보에 시간 스탬프를 붙여서 당신에게 되돌려줍니다.효과는 다음과 같습니다.
또한 이 코드를 순수한 C++11 버전으로 고쳤습니다. CMake 컴파일을 사용합니다. 컴파일을 지원하기 위해서는 이 -std=c++ 11을 추가해야 합니다.
CMakeLists.txt 코드는 다음과 같습니다.
1cmake_minimum_required(VERSION 2.8)
2 PROJECT(myreactorserver)
3 AUX_SOURCE_DIRECTORY(./ SRC_LIST)
4 SET(EXECUTABLE_OUTPUT_PATH ./)
5 ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated
6 -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64
7 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR
8 -DAC_HAS_CRITICAL -DTIXML_USE_STL
9 -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS}
10 -std=c++11)
11 INCLUDE_DIRECTORIES( ./ )
12 LINK_DIRECTORIES( ./ )
13 set( main.cpp myreator.cpp )
14 ADD_EXECUTABLE(myreactorserver ${SRC_LIST})
15 TARGET_LINK_LIBRARIES(myreactorserver pthread)
myreactor.h 파일 내용:
1/**
2 *@desc: myreactor , myreactor.h
3 *@author: zhangyl
4 *@date: 2016.12.03
5 */
6 #ifndef __MYREACTOR_H__
7 #define __MYREACTOR_H__
8 #include
9 #include
10 #include
11 #include
12 #include
13 #define WORKER_THREAD_NUM 5
14 class CMyReactor
15 {
16 public:
17 CMyReactor();
18 ~CMyReactor();
19
20 bool init(const char* ip, short nport);
21 bool uninit();
22
23 bool close_client(int clientfd);
24
25 static void* main_loop(void* p);
26 private:
27 //no copyable
28 CMyReactor(const CMyReactor& rhs);
29 CMyReactor& operator = (const CMyReactor& rhs);
30
31 bool create_server_listener(const char* ip, short port);
32
33 static void accept_thread_proc(CMyReactor* pReatcor);
34 static void worker_thread_proc(CMyReactor* pReatcor);
35 private:
36 //C11
37 int m_listenfd = 0;
38 int m_epollfd = 0;
39 bool m_bStop = false;
40
41 std::shared_ptr<std::thread> m_acceptthread;
42 std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];
43
44 std::condition_variable m_acceptcond;
45 std::mutex m_acceptmutex;
46
47 std::condition_variable m_workercond ;
48 std::mutex m_workermutex;
49
50 std::list<int> m_listClients;
51 };
52 #endif //!__MYREACTOR_H__
myreactor.cpp 파일 내용:
1 /**
2 *@desc: myreactor , myreactor.cpp
3 *@author: zhangyl
4 *@date: 2016.12.03
5 */ #include "myreactor.h"
6 #include
7 #include
8 #include
9 #include
10 #include
11 #include //for htonl() and htons()
12 #include
13 #include
14 #include
15 #include
16 #include
17 #include
18 #include //for std::setw()/setfill()
19 #include
20 #define min(a, b) ((a <= b) ? (a) : (b))
21 CMyReactor::CMyReactor()
22 {
23 //m_listenfd = 0;
24 //m_epollfd = 0;
25 //m_bStop = false;
26 }
27 CMyReactor::~CMyReactor()
28 {
29 }
30 bool CMyReactor::init(const char* ip, short nport)
31 {
32 if (!create_server_listener(ip, nport))
33 {
34 std::cout << "Unable to bind: " << ip
35 << ":" << nport << "." << std::endl;
36 return false;
37 }
38
39
40 std::cout << "main thread id = " << std::this_thread::get_id()
41 << std::endl;
42
43 //
44 m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));
45
46 //
47 for (auto& t : m_workerthreads)
48 {
49 t.reset(new std::thread(CMyReactor::worker_thread_proc, this));
50 }
51
52
53 return true;
54 }
55 bool CMyReactor::uninit()
56 {
57 m_bStop = true;
58 m_acceptcond.notify_one();
59 m_workercond.notify_all();
60
61 m_acceptthread->join();
62 for (auto& t : m_workerthreads)
63 {
64 t->join();
65 }
66
67 ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);
68
69 //TODO: shutdown() ?
70 ::shutdown(m_listenfd, SHUT_RDWR);
71 ::close(m_listenfd);
72 ::close(m_epollfd);
73
74 return true;
75 }
76 bool CMyReactor::close_client(int clientfd)
77 {
78 if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
79 {
80 std::cout << "close client socket failed as call epoll_ctl failed"
81 << std::endl;
82 //return false;
83 }
84
85
86 ::close(clientfd);
87
88 return true;
89 }
90
91 void* CMyReactor::main_loop(void* p)
92 {
93 std::cout << "main thread id = "
94 << std::this_thread::get_id() << std::endl;
95
96 CMyReactor* pReatcor = static_cast(p);
97
98 while (!pReatcor->m_bStop)
99 {
100 struct epoll_event ev[1024];
101 int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);
102 if (n == 0)
103 continue;
104 else if (n < 0)
105 {
106 std::cout << "epoll_wait error" << std::endl;
107 continue;
108 }
109
110 int m = min(n, 1024);
111 for (int i = 0; i < m; ++i)
112 {
113 //
114 if (ev[i].data.fd == pReatcor->m_listenfd)
115 pReatcor->m_acceptcond.notify_one();
116 //
117 else
118 {
119 {
120 std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
121 pReatcor->m_listClients.push_back(ev[i].data.fd);
122 }
123
124 pReatcor->m_workercond.notify_one();
125 //std::cout << "signal" << std::endl;
126 }// end if
127
128 }// end for-loop
129 }// end while
130
131 std::cout << "main loop exit ..." << std::endl;
132
133 return NULL;
134 }
135 void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
136 {
137 std::cout << "accept thread, thread id = "
138 << std::this_thread::get_id() << std::endl;
139
140 while (true)
141 {
142 int newfd;
143 struct sockaddr_in clientaddr;
144 socklen_t addrlen;
145 {
146 std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);
147 pReatcor->m_acceptcond.wait(guard);
148 if (pReatcor->m_bStop)
149 break;
150
151 //std::cout << "run loop in accept_thread_proc" << std::endl;
152
153 newfd = ::accept(pReatcor->m_listenfd,
154 (struct sockaddr *)&clientaddr, &addrlen);
155 }
156 if (newfd == -1)
157 continue;
158
159 std::cout << "new client connected: "
160 << ::inet_ntoa(clientaddr.sin_addr) << ":"
161 << ::ntohs(clientaddr.sin_port) << std::endl;
162
163 // socket non-blocking
164 int oldflag = ::fcntl(newfd, F_GETFL, 0);
165 int newflag = oldflag | O_NONBLOCK;
166 if (::fcntl(newfd, F_SETFL, newflag) == -1)
167 {
168 std::cout << "fcntl error, oldflag =" << oldflag
169 << ", newflag = " << newflag << std::endl;
170 continue;
171 }
172
173 struct epoll_event e;
174 memset(&e, 0, sizeof(e));
175 e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
176 e.data.fd = newfd;
177 if (::epoll_ctl(pReatcor->m_epollfd,
178 EPOLL_CTL_ADD, newfd, &e) == -1)
179 {
180 std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
181 }
182 }
183
184 std::cout << "accept thread exit ..." << std::endl;
185 }
186 void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
187 {
188 std::cout << "new worker thread, thread id = "
189 << std::this_thread::get_id() << std::endl;
190
191 while (true)
192 {
193 int clientfd;
194 {
195 std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
196 while (pReatcor->m_listClients.empty())
197 {
198 if (pReatcor->m_bStop)
199 {
200 std::cout << "worker thread exit ..." << std::endl;
201 return;
202 }
203
204 pReatcor->m_workercond.wait(guard);
205 }
206
207 clientfd = pReatcor->m_listClients.front();
208 pReatcor->m_listClients.pop_front();
209 }
210
211 //gdb , ,
212 std::cout << std::endl;
213
214 std::string strclientmsg;
215 char buff[256];
216 bool bError = false;
217 while (true)
218 {
219 memset(buff, 0, sizeof(buff));
220 int nRecv = ::recv(clientfd, buff, 256, 0);
221 if (nRecv == -1)
222 {
223 if (errno == EWOULDBLOCK)
224 break;
225 else
226 {
227 std::cout << "recv error, client disconnected, fd = "
228 << clientfd << std::endl;
229 pReatcor->close_client(clientfd);
230 bError = true;
231 break;
232 }
233
234 }
235 // socket, 。
236 else if (nRecv == 0)
237 {
238 std::cout << "peer closed, client disconnected, fd = "
239 << clientfd << std::endl;
240 pReatcor->close_client(clientfd);
241 bError = true;
242 break;
243 }
244
245 strclientmsg += buff;
246 }
247
248 // ,
249 if (bError)
250 continue;
251
252 std::cout << "client msg: " << strclientmsg;
253
254 //
255 time_t now = time(NULL);
256 struct tm* nowstr = localtime(&now);
257 std::ostringstream ostimestr;
258 ostimestr << "[" << nowstr->tm_year + 1900 << "-"
259 << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"
260 << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
261 << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"
262 << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"
263 << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";
264
265 strclientmsg.insert(0, ostimestr.str());
266
267 while (true)
268 {
269 int nSent = ::send(clientfd, strclientmsg.c_str(),
270 strclientmsg.length(), 0);
271 if (nSent == -1)
272 {
273 if (errno == EWOULDBLOCK)
274 {
275 std::this_thread::sleep_for(std::chrono::milliseconds(10));
276 continue;
277 }
278 else
279 {
280 std::cout << "send error, fd = "
281 << clientfd << std::endl;
282 pReatcor->close_client(clientfd);
283 break;
284 }
285
286 }
287
288 std::cout << "send: " << strclientmsg;
289 strclientmsg.erase(0, nSent);
290
291 if (strclientmsg.empty())
292 break;
293 }
294 }
295 }
296 bool CMyReactor::create_server_listener(const char* ip, short port)
297 {
298 m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
299 if (m_listenfd == -1)
300 return false;
301
302 int on = 1;
303 ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR,
304 (char *)&on, sizeof(on));
305 ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT,
306 (char *)&on, sizeof(on));
307
308 struct sockaddr_in servaddr;
309 memset(&servaddr, 0, sizeof(servaddr));
310 servaddr.sin_family = AF_INET;
311 servaddr.sin_addr.s_addr = inet_addr(ip);
312 servaddr.sin_port = htons(port);
313 if (::bind(m_listenfd, (sockaddr *)&servaddr,
314 sizeof(servaddr)) == -1)
315 return false;
316
317 if (::listen(m_listenfd, 50) == -1)
318 return false;
319
320 m_epollfd = ::epoll_create(1);
321 if (m_epollfd == -1)
322 return false;
323
324 struct epoll_event e;
325 memset(&e, 0, sizeof(e));
326 e.events = EPOLLIN | EPOLLRDHUP;
327 e.data.fd = m_listenfd;
328 if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)
329 return false;
330
331 return true;
332 }
main.cpp 파일 내용:
1/**
2 *@desc: reactor
3 *@author: zhangyl
4 *@date: 2016.12.03
5 */
6
7#include
8#include //for signal()
9#include
10#include //for exit()
11#include
12#include
13#include
14#include "myreactor.h"
15
16CMyReactor g_reator;
17
18void prog_exit(int signo)
19{
20 std::cout << "program recv signal " << signo
21 << " to exit." << std::endl;
22
23 g_reator.uninit();
24}
25
26void daemon_run()
27{
28 int pid;
29 signal(SIGCHLD, SIG_IGN);
30 //1) ,fork ID;
31 //2) ,fork 0;
32 //3) ,fork ;
33 pid = fork();
34 if (pid < 0)
35 {
36 std:: cout << "fork error" << std::endl;
37 exit(-1);
38 }
39 // ,
40 else if (pid > 0)
41 {
42 exit(0);
43 }
44 // parent child session ,parent (session) ,
45 //parent , exit , , init 。
46 // setsid() ,child (session)id。
47 // parent , child 。
48 setsid();
49 int fd;
50 fd = open("/dev/null", O_RDWR, 0);
51 if (fd != -1)
52 {
53 dup2(fd, STDIN_FILENO);
54 dup2(fd, STDOUT_FILENO);
55 dup2(fd, STDERR_FILENO);
56 }
57 if (fd > 2)
58 close(fd);
59}
60
61
62int main(int argc, char* argv[])
63{
64 //
65 signal(SIGCHLD, SIG_DFL);
66 signal(SIGPIPE, SIG_IGN);
67 signal(SIGINT, prog_exit);
68 signal(SIGKILL, prog_exit);
69 signal(SIGTERM, prog_exit);
70
71 short port = 0;
72 int ch;
73 bool bdaemon = false;
74 while ((ch = getopt(argc, argv, "p:d")) != -1)
75 {
76 switch (ch)
77 {
78 case 'd':
79 bdaemon = true;
80 break;
81 case 'p':
82 port = atol(optarg);
83 break;
84 }
85 }
86
87 if (bdaemon)
88 daemon_run();
89
90
91 if (port == 0)
92 port = 12345;
93
94
95 if (!g_reator.init("0.0.0.0", 12345))
96 return -1;
97
98 g_reator.main_loop(&g_reator);
99
100 return 0;
101}