⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 epoll_td2.cc

📁 介绍epoll
💻 CC
字号:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>

#define MAXLINE    10
#define OPEN_MAX   100
#define LISTENQ    20
#define SERV_PORT  5555
#define INFTIM     1000

#define MAXFDS    16 * 10000
#define EVENTSIZE 15 * 10000

// 线程池任务队列结构体
struct task
{
	int fd;            // 需要读写的文件描述符
	struct task *next; // 下一个任务
};

// 用于读写两个的两个方面传递参数
struct user_data
{
	int fd;
	unsigned int n_size;
	char line[MAXLINE];
};

// 线程的任务函数
void *readtask(void *args);
void *writetask(void *args);


// 声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
struct epoll_event ev, events[EVENTSIZE];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t  cond1;
struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;

// 设置非阻塞模式
void setnonblocking(int sock)
{
	int opts;
	opts = fcntl(sock,F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(sock,GETFL)");
		exit(1);
	}

	opts = opts | O_NONBLOCK;
	if(fcntl(sock, F_SETFL, opts) < 0)
	{
		perror("fcntl(sock,SETFL,opts)");
		exit(1);
	}    
}

// 发送数据
ssize_t socket_send(int sockfd, const char *buffer, size_t buflen)
{
	ssize_t tmp;
	size_t total  = buflen;
	const char *p = buffer;

	for( ; ; )
	{
		tmp = send(sockfd, p, total, 0);
		if(tmp < 0)
		{
			// 当send收到信号时,可以继续写,但这里返回-1.
			if(errno == EINTR)
			{
				return -1;
			}

			// 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,
			// 在这里做延时后再重试.
			if(errno == EAGAIN)
			{
				usleep(1000);
				continue;
			}

			return -1;
		}

		if((size_t)tmp == total)
		{
			return buflen;
		}

		total -= tmp;
		p += tmp;
	}

	return tmp;
}

// 接收数据
ssize_t socket_recv(int sockfd, char *buffer, size_t buflen)
{
	int rs, n;

	while(rs)
	{
		n = recv(sockfd, buffer, MAXLINE, 0);
		if(n < 0)
		{
			// 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
			// 在这里就当作是该次事件已处理处.
			if(errno == EAGAIN)
			{
				break;
			}
			else
			{
				return -1;
			}
		}
		else if(n == 0)
		{
			// 这里表示对端的socket已正常关闭.
			close(sockfd);
		}

		if(n == sizeof(buffer))
		{
			rs = 1;   // 未读完 需要再次读取
		}
		else
		{
			rs = 0;
		}
	}

	return n;
}
int main(int argc, char *argv[])
{
	int i, maxi, listenfd, connfd, sockfd,nfds;
	int opt = 1;
	pthread_t tid1,tid2;
	struct task *new_task=NULL;
	struct user_data *rdata=NULL;
	socklen_t clilen;

	pthread_mutex_init(&mutex,NULL);
	pthread_cond_init(&cond1,NULL);

	// 初始化用于读线程池的线程
	pthread_create(&tid1, NULL, readtask, NULL);
	pthread_create(&tid2, NULL, readtask, NULL);

	// 生成用于处理accept的epoll专用的文件描述符    
	epfd = epoll_create(MAXFDS);

	struct sockaddr_in clientaddr;
	struct sockaddr_in serveraddr;
	listenfd = socket(AF_INET, SOCK_STREAM, 0);

	// 重复绑定
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));

	// 把socket设置为非阻塞方式
	setnonblocking(listenfd);

	// 设置与要处理的事件相关的文件描述符
	ev.data.fd = listenfd;
	//设置要处理的事件类型
	ev.events = EPOLLIN | EPOLLET;//EPOLLIN | EPOLLERR | EPOLLHUP;

	// 注册epoll事件
	epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd,&ev);

	bzero(&serveraddr, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;

	//char *local_addr="192.168.100.5";
	//inet_aton(local_addr,&(serveraddr.sin_addr));
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
	serveraddr.sin_port        = htons(SERV_PORT);
	bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
	listen(listenfd, LISTENQ);

	maxi = 0; 
	for ( ; ; ) 
	{
		// 等待epoll事件的发生
		nfds = epoll_wait(epfd, events, EVENTSIZE, -1);

		// 处理所发生的所有事件      
		for(i = 0; i < nfds; ++i)
		{
			if(events[i].data.fd == listenfd)// 客户端;连接事件
			{
				connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
				if(connfd < 0)
				{
					perror("connfd < 0");
					exit(1);
				}

				setnonblocking(connfd);

				unsigned char *sip = (unsigned char *)&clientaddr.sin_addr.s_addr;
				printf("%d connect from >> %d.%d.%d.%d\n", connfd, sip[0], sip[1], sip[2], sip[3]);

				// 设置用于读操作的文件描述符
				ev.data.fd = connfd;
				// 设置用于注测的读操作事件
				ev.events = EPOLLIN | EPOLLET;
				//注册ev
				epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
			}
			else if(events[i].events & EPOLLIN)// 有读数据
			{
				printf("reading!\n");                 
				if ( (sockfd = events[i].data.fd) < 0)
				{
					continue;
				}
				new_task = new task();
				new_task->fd = sockfd;
				new_task->next = NULL;

				// 添加新的读任务
				pthread_mutex_lock(&mutex);
				if(readhead==NULL)
				{
					readhead = new_task;
					readtail = new_task;
				}    
				else
				{    
					readtail->next = new_task;
					readtail = new_task;
				}    

				// 唤醒所有等待cond1条件的线程
				pthread_cond_broadcast(&cond1);
				pthread_mutex_unlock(&mutex);  
			}
			else if(events[i].events & EPOLLOUT)// 有写数据
			{    
				rdata = (struct user_data *)events[i].data.ptr;
				sockfd = rdata->fd;
				write(sockfd, rdata->line, rdata->n_size);

				delete rdata;

				// 设置用于读操作的文件描述符
				ev.data.fd = sockfd;
				// 设置用于注测的读操作事件
				ev.events = EPOLLIN | EPOLLET;
				// 修改sockfd上要处理的事件为EPOLIN
				epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
			}
			else if(events[i].events & EPOLLHUP)
			{
				// 断开的情况,删除事件,然后关闭fd

				rdata = (struct user_data *)events[i].data.ptr;
				sockfd = rdata->fd;

				// 设置用于读操作的文件描述符
				ev.data.fd = sockfd;
				ev.events  = 0;
				epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &ev);

				//struct epoll_event event;  				//event.events   = 0; 				//event.data.u64 = 0; 				//event.data.ptr = sockfd; 				//epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &event);  

				printf("epollhup %d\n", sockfd);
				close(sockfd);
			}
		}
	}
}

void *readtask(void *args)
{
	int fd = -1;
	unsigned int n;

	//用于把读出来的数据传递出去
	struct user_data *data = NULL;
	for( ; ; )
	{
		pthread_mutex_lock(&mutex);

		//等待到任务队列不为空
		while(readhead == NULL)
		{
			pthread_cond_wait(&cond1, &mutex);
		}

		fd = readhead->fd;

		// 从任务队列取出一个读任务
		struct task *tmp = readhead;

		readhead = readhead->next;
		delete tmp;

		pthread_mutex_unlock(&mutex);
		data = new user_data();
		data->fd = fd;
		if ((n = read(fd, data->line, MAXLINE)) < 0) 
		{
			if (errno == ECONNRESET) 
			{
				std::cout<<"errno == ECONNRESET "<<fd<<std::endl;
				close(fd);
			} 
			else
			{
				std::cout<<"readline error "<<fd<<std::endl;
			}

			if(data!=NULL)
			{
				delete data;
			}
		} 
		else if (n == 0) 
		{
			printf("%d Client close connect!\n", fd);

			close(fd);

			if(data != NULL)
			{
				delete data;
			}
		}
		else
		{
			// 通知写事件
			data->n_size = n;

			//设置需要传递出去的数据
			ev.data.ptr = data;
			//设置用于注测的写操作事件
			ev.events = EPOLLOUT | EPOLLET;
			//修改sockfd上要处理的事件为EPOLLOUT
			epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -