⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rc_ctl_net.c

📁 epoll机制的收发程序 只能在2.6内核上使用
💻 C
📖 第 1 页 / 共 2 页
字号:
				memcpy(&(con->address), &addr_cli, sizeof(struct sockaddr_in));
				
				pthread_mutex_lock(&mutex_con_t);
				rc_add_connection(&head_con, con);
				pthread_mutex_unlock(&mutex_con_t);
			}//2007-4-24 9:17
			else if(events[i].events & EPOLLIN)//2007-4-24 9:48
			{
				sockfd = events[i].data.fd;
				if(sockfd < 0)
					continue;
				len = sizeof(struct_task_net_r);
				newtask = malloc(len);
				if(newtask == NULL)
					continue;
				bzero(newtask, len);
				newtask->sockfd = sockfd;
				
				pthread_mutex_lock(&mutex_r_cam);
				rc_add_task_r(&task_r_cam, newtask);
				
				/* 唤醒所有等待这个条件的线程 */
				pthread_mutex_unlock(&mutex_r_cam);
				pthread_cond_broadcast(&cond_r_cam);
			}//2007-4-24 9:48
			else if(events[i].events & EPOLLOUT)
			{
			}
		}//2007-4-24 9:16
		
	}//2007-4-24 9:15
	
	pthread_exit(NULL);
	return 0;
}


/*
 * 功  能:从网络上读控件数据的线程
 * 参  数:msg:线程入口参数
 * 返回值:出错返回0
 */
int rc_net_r_atx(char *msg)
{
	int						res;
	int						opt;
	int						sockfd;
	int						len;
	int						rcvlen;
	int						len_opt;
	char					buf[MAX_LEN_CONNECT_BUF];
	struct_task_net_r		*tmp;
	struct_msg_con			con;
	struct_msg_atx			*lnk;
	
	
	while(1)
	{
		pthread_mutex_lock(&mutex_r_atx);
		while(task_r_atx == NULL)
		{
			pthread_cond_wait(&cond_r_atx, &mutex_r_atx);
		}
		
		sockfd = task_r_atx->sockfd;
		tmp = task_r_atx;
		task_r_atx = task_r_atx->next;
		free(tmp);
		pthread_mutex_unlock(&mutex_r_atx);
		
		pthread_mutex_lock(&mutex_con_atx);
		lnk = rc_get_lnk_atx(head_atx, sockfd);
		if(lnk == NULL)
		{
			pthread_mutex_unlock(&mutex_con_atx);
			continue;
		}
		pthread_mutex_lock(&(lnk->mutex_opt));
		pthread_mutex_unlock(&mutex_con_atx);
		
		
		opt = -1;
		len_opt = sizeof(opt);
		res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
		if((res < 0) || (opt != 0))
		{
			printf("****** %s %d res : %d ******\n", __FILE__, __LINE__, res);
			goto LAB_ERROR;
		}
		
		
		len = sizeof(buf);
		bzero(buf, len);
		if(lnk->rcv.size != 0)
		{
			res = lnk->rcv.size - lnk->rcv.len;
			if(res <= len);
				len = res;
		}
		
		rcvlen = recv(sockfd, buf, len, MSG_DONTWAIT);
		if(rcvlen < 0)
		{
			lnk->errors++;
			if(lnk->errors > MAX_TIMES_ERROR_RCV)
			{
				goto LAB_ERROR;
			}
			goto LAB_NEXT;
		}
		else if(rcvlen == 0)
		{
			goto LAB_ERROR;
		}
		
		lnk->lasttime = rc_sec_now();
		
		if(lnk->rcv.size == 0)
		{
			lnk->rcv.len = 0;
			if(rcvlen >= LEN_LEN_PACKET_FATHER)
			{
				memcpy(&(lnk->rcv.size), buf, LEN_LEN_PACKET_FATHER);
				lnk->rcv.size += LEN_LEN_PACKET_FATHER;
				lnk->rcv.buf = malloc(lnk->rcv.size);
				if(lnk->rcv.buf == NULL)
				{
					goto LAB_NEXT;
				}
				bzero(lnk->rcv.buf, lnk->rcv.size);
			}
			else
			{
				lnk->rcv.buf = malloc(MAX_LEN_CONNECT_BUF);
				if(lnk->rcv.buf == NULL)
				{
					goto LAB_NEXT;
				}
				bzero(lnk->rcv.buf, MAX_LEN_CONNECT_BUF);
			}
		}
		
		len = rcvlen + lnk->rcv.len;
		if(len <= lnk->rcv.size)
			len_opt = rcvlen;
		else
			len_opt = lnk->rcv.size - lnk->rcv.len;
		memcpy(lnk->rcv.buf + lnk->rcv.len, buf, len_opt);
		lnk->rcv.len += len_opt;
		
		/* 
		 * 这个包的数据接收已经完成
		 * 开始处理接收到的数据
		 */
		if(lnk->rcv.len >= lnk->rcv.size)
		{
			bzero(&con, sizeof(struct_msg_con));
			con.sockfd = sockfd;
			memcpy(&(con.address), &(lnk->address), sizeof(struct sockaddr_in));
			if(strlen(lnk->serial) == LEN_SERIAL_CHECK)
				strcpy(con.serial, lnk->serial);
	//		printf("%s %d\n", __FILE__, __LINE__);
			res = rc_deal_data_net(lnk->rcv.buf, lnk->rcv.size, &con);
			free((lnk->rcv.buf));
			lnk->rcv.buf = NULL;
			lnk->rcv.len = 0;
			lnk->rcv.size = 0;
			if(res < 0)
			{
				goto LAB_ERROR;
			}
			if(strlen(lnk->serial) != LEN_SERIAL_CHECK)
			{
				bzero(lnk->serial, sizeof(lnk->serial));
				strcpy(lnk->serial, con.serial);
			}
		}
		
LAB_NEXT:
		if(lnk != NULL)
			pthread_mutex_unlock(&(lnk->mutex_opt));
		continue;
LAB_ERROR:
		if(lnk != NULL)
			pthread_mutex_unlock(&(lnk->mutex_opt));
		rc_fd_del(sockfd, epfd_atx);
		/* 在这里也要将这个sockfd的发送任务从队列中删除 */
		pthread_mutex_lock(&mutex_w_atx);
		rc_del_task_w_one(&task_w_atx, sockfd);
		pthread_mutex_unlock(&mutex_w_atx);
		
		pthread_mutex_lock(&mutex_con_atx);
		rc_del_atx_one(&head_atx, sockfd);
		pthread_mutex_unlock(&mutex_con_atx);
	}
	
	pthread_exit(NULL);
	return 0;
}


/*
 * 功  能:向网络上写控件数据的线程
 * 参  数:msg:线程入口参数
 * 返回值:出错返回0
 */
int rc_net_w_atx(char *msg)
{
	int						res;
	int						sockfd;
	int						opt;
	int						len_opt;
	int						len_snd;
	struct_task_net_w		*tmp;
	
	while(1)
	{
		pthread_mutex_lock(&mutex_w_atx);
		tmp = task_w_atx;
		while(1)
		{
			if(tmp == NULL)
			{
				while(1)
				{
					pthread_cond_wait(&cond_w_atx, &mutex_w_atx);
					tmp = task_w_atx;
					if(tmp != NULL)
						break;
				}
			}
			res = pthread_mutex_trylock(&(tmp->mutex_opt));
			if(res == 0)
			{
				break;
			}
			else
			{
				tmp = tmp->next;
			}
		}
		pthread_mutex_unlock(&mutex_w_atx);
		
		opt = -1;
		len_opt = sizeof(opt);
		sockfd = tmp->sockfd;
		res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
		if((res < 0) || (opt != 0))
		{
			/* socket出错或者已经被关闭*/
			goto LAB_END_SND;
		}
		len_snd = send(sockfd, (tmp->buf->data.buf + tmp->buf->data.len), (tmp->buf->data.size - tmp->buf->data.len), MSG_DONTWAIT);
		if(len_snd <= 0)
		{
			goto LAB_RESEND;
			tmp->errors++;
			if(tmp->errors > MAX_TIMES_ERROR_SND + 100)
			{
				/* 发送出错次数达到最大限制 */
				goto LAB_END_SND;
			}
			else
			{
				/* 没有发送成功,重新发送 */
				goto LAB_RESEND;
			}
		}
		tmp->errors = 0;
		tmp->buf->data.len += len_snd;
		if(tmp->buf->data.len >= tmp->buf->data.size)
		{
			/* 发送成功 */
		//	goto LAB_END_SND;
			goto LAB_NEXT;
		}
		else
		{
			/* 没有发送完成,重新发送 */
			goto LAB_RESEND;
		}
LAB_NEXT:
		pthread_mutex_unlock(&(tmp->mutex_opt));
		pthread_mutex_unlock(&(tmp->mutex_opt));
		rc_del_task_w_one(&task_w_atx, sockfd);
		pthread_mutex_unlock(&mutex_w_atx);
		continue;
LAB_RESEND:
		pthread_mutex_unlock(&(tmp->mutex_opt));
	//	pthread_cond_broadcast(&cond_w_atx);
		continue;
LAB_END_SND:
		/* 处理发送结束后的动作 */
	//	printf("%s %d\n", __FILE__, __LINE__);
		rc_fd_del(sockfd, epfd_atx);
		pthread_mutex_lock(&mutex_w_atx);
		pthread_mutex_unlock(&(tmp->mutex_opt));
		rc_del_task_w_one(&task_w_atx, sockfd);
		pthread_mutex_unlock(&mutex_w_atx);
		
		pthread_mutex_lock(&mutex_con_atx);
		rc_del_atx_one(&head_atx, sockfd);
		pthread_mutex_unlock(&mutex_con_atx);
	}
	
	
	pthread_exit(NULL);
	return 0;
}

/*
 * 功  能:控件相关的主线程
 * 参  数:par:线程入口参数
 * 返回值:成功返回0,否则返回-1
 * 说  明:进程退出值范围-401~-600
 */
int rc_thread_atx(char *par)
{
	
	int					i;
	int					res;
	int					fd_listen;//用于监听的文件描述符
	int					fd_con;//链接上来的文件描述符
	int					nfds;
	int					len_sock;
	int					len;
	int					sockfd;
	
	
	pthread_t			tid_r[NUM_PTHREAD_R_N_ATX];//用于从网络读取数据的线程id
	pthread_t			tid_w[NUM_PTHREAD_W_N_ATX];//用于向网络发送数据的线程id
	
	struct sockaddr_in	addr_cli;
	struct epoll_event	events[MAX_EVENTS_EPOLL];
	struct epoll_event	ev;
	
	struct_par_net		net_r;//用于从网络读取数据的线程参数
	struct_par_net		net_w;//用于向网络发送数据的线程参数
	
	struct_task_net_r	*newtask;
	
	struct_msg_atx		*atx;//链接信息指针,用于添加控件链接信息
	
	
	
	
	
	
	/* 初始化用于读线程池的线程 */
	net_r.epfd = &epfd_cam;
	for(i = 0; i < NUM_PTHREAD_R_N_ATX; i++)
	{
		res = pthread_create(&(tid_r[i]), NULL, (void *)rc_net_r_atx, (void *)(&net_r));
		if(res != 0)
		{
			printf("%s %d Create read data from atxs thread error!\n", __FILE__, __LINE__);
			exit(-401);
		}
		pthread_detach(tid_r[i]);
	}
	
	
	/* 初始化用于写线程池的线程 */
	net_w.epfd = &epfd_cam;
	for(i = 0; i < NUM_PTHREAD_W_N_ATX; i++)
	{
		res = pthread_create(&(tid_w[i]), NULL, (void *)rc_net_w_atx, (void *)(&net_w));
		if(res != 0)
		{
			printf("%s %d Create write data to atxs thread error!\n", __FILE__, __LINE__);
			exit(-402);
		}
		pthread_detach(tid_w[i]);
	}
	
	/* 建立端口监听 */
	while(1)
	{
		fd_listen = rc_socket_create(port_atx);
		if(fd_listen < 0)
		{
			sleep(1);
			continue;
		}
		else
			break;
	}
	
	/* 生成用于处理accept的epoll专用的文件描述符 */
	epfd_atx = epoll_create(MAX_DESCRIPTOR_EPOLL);
	
	ev.data.fd = fd_listen;//设置与要处理的事件相关的文件描述符
	ev.events = (EPOLLIN | EPOLLET);//设置要处理的事件类型
	epoll_ctl(epfd_atx, EPOLL_CTL_ADD, fd_listen, &ev);//注册epoll事件
	while(1)//2007-4-24 11:02
	{//TIMEOUT_EPOLL
		nfds = epoll_wait(epfd_atx, events, MAX_EVENTS_EPOLL, TIMEOUT_EPOLL);//等待epoll事件
		if(nfds <= 0)
		{
			continue;
		}
		for(i = 0; i < nfds; i++)//2007-4-24 11:06
		{
			if(events[i].data.fd == fd_listen)//2007-4-24 11:07
			{
				len_sock = sizeof(struct sockaddr_in);
				bzero(&addr_cli, sizeof(addr_cli));
				fd_con = accept(fd_listen, (struct sockaddr *)&addr_cli, &len_sock);
				if(fd_con <= 0)
				{
					printf("%s %d fd_con : %d ; error : %s", __FILE__, __LINE__, fd_con, strerror(errno));
					continue;
				}
				
				printf("\t\t\t ************ Get connection form ip : %s ; port : %d ************\n", inet_ntoa(addr_cli.sin_addr), addr_cli.sin_port);
				
				rc_set_no_block(fd_con);
				bzero(&ev, sizeof(ev));
				ev.data.fd = fd_con;//设置用于读操作的文件描述符
				ev.events = (EPOLLIN | EPOLLET);//设置用于注册的读操作事件
				res = epoll_ctl(epfd_atx, EPOLL_CTL_ADD, fd_con, &ev);//注册ev
				if(res < 0)
				{
					close(fd_con);
					continue;
				}
				
				len = sizeof(struct_msg_atx);
				atx = malloc(len);
				if(atx == NULL)
				{
					rc_fd_del(fd_con, epfd_atx);
					close(fd_con);
					continue;
				}
				bzero(atx, len);
				atx->sockfd = fd_con;
				atx->lasttime = rc_sec_now();
				pthread_mutex_init(&(atx->mutex_opt), NULL);
				memcpy(&(atx->address), &addr_cli, sizeof(struct sockaddr_in));
				
				pthread_mutex_lock(&mutex_con_atx);
				rc_add_atx(&head_atx, atx);
				pthread_mutex_unlock(&mutex_con_atx);
			}//2007-4-24 11:07
			else if(events[i].events & EPOLLIN)
			{
				sockfd = events[i].data.fd;
				if(sockfd < 0)
					continue;
				len = sizeof(struct_task_net_r);
				newtask = malloc(len);
				if(newtask == NULL)
					continue;
				bzero(newtask, len);
				newtask->sockfd = sockfd;
				
				pthread_mutex_lock(&mutex_r_atx);
				rc_add_task_r(&task_r_atx, newtask);
				
				/* 唤醒所有等待这个条件的线程 */
				pthread_mutex_unlock(&mutex_r_atx);
				pthread_cond_broadcast(&cond_r_atx);
			}
			else if(events[i].events & EPOLLOUT)
			{
			}
		}//2007-4-24 11:06
	}//2007-4-24 11:02
	
	pthread_exit(NULL);
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -