I/O 멀티플렉싱, 복제본 없음, 높은 동시 채팅방 demo
18594 단어 Linxu 네트워크 프로그래밍
클라이언트 프로그램은 poll 동료를 사용하여 사용자의 입력과 네트워크 연결을 감청하고 splice 함수를 이용하여 사용자의 입력 내용을 네트워크 연결에 직접 지정하여 전송함으로써 데이터의 제로 복사를 실현하여 프로그램 실행 효율을 높인다.
splice 함수 소개:
#include
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
splice는 두 파일 설명자 사이에서 데이터를 이동하는 데 사용되며, 제로 복사입니다.
①fd_in 매개 변수는 입력할 설명자입니다.파이핑 파일 설명자일 경우 ② offin을 NULL로 설정해야 합니다.그렇지 않으면 offin은 입력 데이터 흐름의 어디에서부터 읽기를 시작하는지 나타냅니다. 이때 NULL이면 입력 데이터 흐름의 현재 오프셋 위치에서 읽습니다.③/④fd_out/off_out는 상술한 것과 같지만 출력에 사용됩니다.⑤len 매개 변수는 이동 데이터의 길이를 지정합니다.⑥ flags 매개변수는 데이터가 이동하는 방법을 제어합니다.
splice 사용 시 fdin 및 fdout에서 최소한 하나는 파이프 파일 설명자여야 합니다.호출이 성공했을 때 이동한 바이트의 수량을 되돌려주기;이것은 0으로 되돌아갈 수 있습니다. 데이터가 이동할 필요가 없다는 것을 의미합니다. 이것은 보통 파이프에서 데이터를 읽을 때 파이프가 기록되지 않았을 때 발생합니다.실패할 때 -1을 되돌려주고errno를 설정합니다.
클라이언트 프로그램은 다음과 같습니다:poll은 표준 입력을 감청하여 sockfd로 직접 바꿉니다.
/*************************************************************************
> File Name: client.c
> Author: dulun
> Mail: [email protected]
> Created Time: 2016 07 19 11 01 09
************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define BUFFER_SIZE 64
int main()
{
const char * ip = "127.0.0.1";
int port = 10086;
struct sockaddr_in server_address;
bzero( &server_address, sizeof(server_address) );
server_address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &server_address.sin_addr );
server_address.sin_port = htons(port);
int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
assert(sockfd >= 0);
if( connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0 )
{
printf("connetcion failed
");
close(sockfd);
return 1;
}
printf("connetc over!
");
pollfd fds[2]; //0:
fds[0].fd = 0;
fds[0].events = POLLIN; //
fds[0].revents = 0; //
//1:
fds[1].fd = sockfd;
fds[1].events = POLLIN | POLLRDHUP; //
fds[1].revents = 0;
char read_buf[BUFFER_SIZE]; // ,
int pipefd[2];
int ret = pipe(pipefd);
assert(ret != -1);
while(1)
{
ret = poll(fds, 2, -1); // , fd
if(ret < 0)
{
printf("poll failed
");
break;
}
if(fds[1].revents & POLLRDHUP)
{
printf("server close the connetcion
");
break;
}
else if( fds[1].revents & POLLIN )
{
memset( read_buf, 0, BUFFER_SIZE );
recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
printf("%s
", read_buf);
}
if(fds[0].revents & POLLIN)
{
// 。 sockfd
// , , 32768
ret = splice( 0, NULL, pipefd[1], NULL, 32768 , SPLICE_F_MORE | SPLICE_F_MOVE);
// , , 32768
ret = splice( pipefd[0], NULL, sockfd, NULL, 32768 , SPLICE_F_MORE | SPLICE_F_MOVE);
}
}
close(sockfd);
return 0;
}
2. 서버:
서버 프로그램은 poll을 사용하여 socket을 감청하고 연결하며 희생 공간을 시간 정책으로 바꾸어 서버 성능을 향상시킵니다.
/*************************************************************************
> File Name: server.cpp
> Author: dulun
> Mail: [email protected]
> Created Time: 2016 07 19 11 26 40
************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
#define USER_LIMIT 100
#define BUFFER_SIZE 64
#define FD_LIMIT 65535
struct client_data
{
sockaddr_in address;
char * write_buff;
char buf[BUFFER_SIZE];
};
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
int main()
{
const char *ip = "127.0.0.1";
int port = 10086;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
int ret;
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); // 。
assert(ret != -1);
ret = listen(listenfd, USER_LIMIT); //
assert(ret != -1);
client_data * users = new client_data[FD_LIMIT];
pollfd fds[USER_LIMIT+1];
int user_counter = 0;
for(int i = 1; i <= USER_LIMIT; i++)
{
fds[i].fd = -1;
fds[i].events = 0;
}
//
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;
while(1)
{
ret = poll( fds, user_counter+1, -1 );
if(ret < 0)
{
printf("POLL failed
");
break;
}
for(int i = 0; i < user_counter+1; i++)
{
if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN) )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_data);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
printf("accept
");
if(connfd < 0)
{
printf("errno is : %d
", errno);
continue;
}
if(user_counter >= USER_LIMIT)
{
const char * info = "too many users
";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
user_counter++;
users[connfd].address = client_address;
setnonblocking(connfd);
fds[user_counter].fd = connfd;
fds[user_counter].events = POLLIN | POLLERR;
fds[user_counter].revents = 0;
printf("comes a new user, now have %d users
", user_counter);
}
else if(fds[i].revents & POLLERR) //ERROR , fds[i].revent & POLLERR , , ,
{
printf("get an error form %d
", fds[i].fd);
char errors[100];
memset(errors, 0, sizeof(errors));
socklen_t length = sizeof(errors);
if( getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0)
{
printf("get socket option failed
");
}
continue;
}
else if(fds[i].revents & POLLIN)
{
int connfd = fds[i].fd;
memset(users[connfd].buf, 0, BUFFER_SIZE);
ret = recv(connfd, users[connfd].buf, BUFFER_SIZE-1, 0);
printf("get %d bytes of client data %s from %d
", ret, users[connfd].buf, connfd);
//sleep(10000); //test!
if(ret < 0)
{
if(errno != EAGAIN)
{
close(connfd);
users[fds[i].fd] = users[fds[user_counter].fd];
fds[i] = fds[user_counter];
i--;
user_counter--;
}
}
else if( ret == 0 )
{
}
else{
for(int j = 1; j <= user_counter; j++)
{
if(fds[j].fd == connfd) // ,
{
continue;
}
fds[j].events |= ~POLLIN; //
fds[j].events |= POLLOUT; // , while , else if
users[fds[j].fd].write_buff = users[connfd].buf;
}
}
}
else if(fds[i].revents & POLLOUT) // ( :user_counter-1)
{
int connfd = fds[i].fd;
if( !users[connfd].write_buff )
{
continue;
}
ret = send(connfd, users[connfd].write_buff, strlen(users[connfd].write_buff), 0); //
users[connfd].write_buff = NULL; //
fds[i].events |= ~POLLOUT; //
fds[i].events |= POLLIN; //
}
}
}
delete []users;
close(listenfd);
return 0;
}