📄 epoll_td2.cc
字号:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555
#define INFTIM 1000
#define MAXFDS 16 * 10000
#define EVENTSIZE 15 * 10000
// 线程池任务队列结构体
struct task
{
int fd; // 需要读写的文件描述符
struct task *next; // 下一个任务
};
// 用于读写两个的两个方面传递参数
struct user_data
{
int fd;
unsigned int n_size;
char line[MAXLINE];
};
// 线程的任务函数
void *readtask(void *args);
void *writetask(void *args);
// 声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
struct epoll_event ev, events[EVENTSIZE];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t cond1;
struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;
// 设置非阻塞模式
void setnonblocking(int sock)
{
int opts;
opts = fcntl(sock,F_GETFL);
if(opts < 0)
{
perror("fcntl(sock,GETFL)");
exit(1);
}
opts = opts | O_NONBLOCK;
if(fcntl(sock, F_SETFL, opts) < 0)
{
perror("fcntl(sock,SETFL,opts)");
exit(1);
}
}
// 发送数据
ssize_t socket_send(int sockfd, const char *buffer, size_t buflen)
{
ssize_t tmp;
size_t total = buflen;
const char *p = buffer;
for( ; ; )
{
tmp = send(sockfd, p, total, 0);
if(tmp < 0)
{
// 当send收到信号时,可以继续写,但这里返回-1.
if(errno == EINTR)
{
return -1;
}
// 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,
// 在这里做延时后再重试.
if(errno == EAGAIN)
{
usleep(1000);
continue;
}
return -1;
}
if((size_t)tmp == total)
{
return buflen;
}
total -= tmp;
p += tmp;
}
return tmp;
}
// 接收数据
ssize_t socket_recv(int sockfd, char *buffer, size_t buflen)
{
int rs, n;
while(rs)
{
n = recv(sockfd, buffer, MAXLINE, 0);
if(n < 0)
{
// 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
// 在这里就当作是该次事件已处理处.
if(errno == EAGAIN)
{
break;
}
else
{
return -1;
}
}
else if(n == 0)
{
// 这里表示对端的socket已正常关闭.
close(sockfd);
}
if(n == sizeof(buffer))
{
rs = 1; // 未读完 需要再次读取
}
else
{
rs = 0;
}
}
return n;
}
int main(int argc, char *argv[])
{
int i, maxi, listenfd, connfd, sockfd,nfds;
int opt = 1;
pthread_t tid1,tid2;
struct task *new_task=NULL;
struct user_data *rdata=NULL;
socklen_t clilen;
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond1,NULL);
// 初始化用于读线程池的线程
pthread_create(&tid1, NULL, readtask, NULL);
pthread_create(&tid2, NULL, readtask, NULL);
// 生成用于处理accept的epoll专用的文件描述符
epfd = epoll_create(MAXFDS);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
// 重复绑定
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));
// 把socket设置为非阻塞方式
setnonblocking(listenfd);
// 设置与要处理的事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN | EPOLLET;//EPOLLIN | EPOLLERR | EPOLLHUP;
// 注册epoll事件
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd,&ev);
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
//char *local_addr="192.168.100.5";
//inet_aton(local_addr,&(serveraddr.sin_addr));
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(SERV_PORT);
bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, LISTENQ);
maxi = 0;
for ( ; ; )
{
// 等待epoll事件的发生
nfds = epoll_wait(epfd, events, EVENTSIZE, -1);
// 处理所发生的所有事件
for(i = 0; i < nfds; ++i)
{
if(events[i].data.fd == listenfd)// 客户端;连接事件
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
if(connfd < 0)
{
perror("connfd < 0");
exit(1);
}
setnonblocking(connfd);
unsigned char *sip = (unsigned char *)&clientaddr.sin_addr.s_addr;
printf("%d connect from >> %d.%d.%d.%d\n", connfd, sip[0], sip[1], sip[2], sip[3]);
// 设置用于读操作的文件描述符
ev.data.fd = connfd;
// 设置用于注测的读操作事件
ev.events = EPOLLIN | EPOLLET;
//注册ev
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
}
else if(events[i].events & EPOLLIN)// 有读数据
{
printf("reading!\n");
if ( (sockfd = events[i].data.fd) < 0)
{
continue;
}
new_task = new task();
new_task->fd = sockfd;
new_task->next = NULL;
// 添加新的读任务
pthread_mutex_lock(&mutex);
if(readhead==NULL)
{
readhead = new_task;
readtail = new_task;
}
else
{
readtail->next = new_task;
readtail = new_task;
}
// 唤醒所有等待cond1条件的线程
pthread_cond_broadcast(&cond1);
pthread_mutex_unlock(&mutex);
}
else if(events[i].events & EPOLLOUT)// 有写数据
{
rdata = (struct user_data *)events[i].data.ptr;
sockfd = rdata->fd;
write(sockfd, rdata->line, rdata->n_size);
delete rdata;
// 设置用于读操作的文件描述符
ev.data.fd = sockfd;
// 设置用于注测的读操作事件
ev.events = EPOLLIN | EPOLLET;
// 修改sockfd上要处理的事件为EPOLIN
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
else if(events[i].events & EPOLLHUP)
{
// 断开的情况,删除事件,然后关闭fd
rdata = (struct user_data *)events[i].data.ptr;
sockfd = rdata->fd;
// 设置用于读操作的文件描述符
ev.data.fd = sockfd;
ev.events = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &ev);
//struct epoll_event event; //event.events = 0; //event.data.u64 = 0; //event.data.ptr = sockfd; //epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &event);
printf("epollhup %d\n", sockfd);
close(sockfd);
}
}
}
}
void *readtask(void *args)
{
int fd = -1;
unsigned int n;
//用于把读出来的数据传递出去
struct user_data *data = NULL;
for( ; ; )
{
pthread_mutex_lock(&mutex);
//等待到任务队列不为空
while(readhead == NULL)
{
pthread_cond_wait(&cond1, &mutex);
}
fd = readhead->fd;
// 从任务队列取出一个读任务
struct task *tmp = readhead;
readhead = readhead->next;
delete tmp;
pthread_mutex_unlock(&mutex);
data = new user_data();
data->fd = fd;
if ((n = read(fd, data->line, MAXLINE)) < 0)
{
if (errno == ECONNRESET)
{
std::cout<<"errno == ECONNRESET "<<fd<<std::endl;
close(fd);
}
else
{
std::cout<<"readline error "<<fd<<std::endl;
}
if(data!=NULL)
{
delete data;
}
}
else if (n == 0)
{
printf("%d Client close connect!\n", fd);
close(fd);
if(data != NULL)
{
delete data;
}
}
else
{
// 通知写事件
data->n_size = n;
//设置需要传递出去的数据
ev.data.ptr = data;
//设置用于注测的写操作事件
ev.events = EPOLLOUT | EPOLLET;
//修改sockfd上要处理的事件为EPOLLOUT
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -