⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rc_ctl_net.c

📁 epoll机制的收发程序 只能在2.6内核上使用
💻 C
📖 第 1 页 / 共 2 页
字号:
/*
 * 文 件 名:rc_ctl_net.c
 * 功    能:网络部分的函数
 * 作    者:马云龙
 * E_mail : mayunlong21@163.com
 * 开始时间:2007-4-23 16:09
 * 结束时间:2007-4-27 13:13
 * 修改时间:
 */
#include	<stdio.h>
#include	<stdlib.h>
#include	<sys/types.h>
#include	<sys/socket.h>
#include	<netinet/in.h>
#include	<arpa/inet.h>
#include	<netdb.h>
#include	<unistd.h>
#include	<string.h>
#include	<sys/time.h>
#include	<sys/stat.h>
#include	<fcntl.h>
#include	<sys/ioctl.h>
#include	<net/ethernet.h>
#include	<net/if.h>
#include	<net/if_arp.h>
#include	<signal.h>
#include	<netinet/ip.h>
#include	<sys/wait.h>
#include	<dirent.h>
#include	<errno.h>
#include	<termios.h>
#include	<time.h>
#include	<pthread.h>

#include	"rc_pub_define.h"
#include	"rc_pub_type.h"
#include	"rc_pub_net.h"
#include	"rc_pub_time.h"
#include	"rc_pub_log.h"
#include	"rc_pub_epoll.h"

#include	"rc_ctl_define.h"
#include	"rc_ctl_table.h"
#include	"rc_ctl_data.h"

#define		MAX_TIMES_ERROR_SND		1000	//发送数据出错的最大次数
#define		MAX_TIMES_ERROR_RCV		20		//接收数据出错的最大次数

#define		LEN_PACKET_VIDEO_ASK		50	//来自控件的视频请求包的长度


extern struct_msg_con		*head_con;//摄像机链接信息的结构体头指针
extern struct_msg_atx		*head_atx;//控件链接信息的结构体头指针

extern pthread_mutex_t		mutex_con_t;//摄像机链接链表的互斥锁
extern pthread_mutex_t		mutex_con_atx;//控件链接链表的互斥锁

extern int		port_atx;
extern int		port_cam;

extern pthread_mutex_t		mutex_r_cam;//读摄像机任务队列的互斥锁
extern pthread_mutex_t		mutex_w_cam;//写摄像机任务队列的互斥锁

extern pthread_cond_t		cond_r_cam;//读摄像机任务线程要使用的条件变量
extern pthread_cond_t		cond_w_cam;//写摄像机任务线程要使用的条件变量

extern pthread_mutex_t		mutex_r_atx;//读控件任务队列的互斥锁
extern pthread_mutex_t		mutex_w_atx;//写控件任务队列的互斥锁

extern pthread_cond_t		cond_r_atx;//读摄像机任务线程要使用的条件变量
extern pthread_cond_t		cond_w_atx;//写摄像机任务线程要使用的条件变量


extern int					epfd_cam;
extern int					epfd_atx;

extern struct_task_net_r	*task_r_cam;//网络读摄像机任务队列头指针
extern struct_task_net_w	*task_w_cam;//网络写摄像机任务队列头指针
extern struct_task_sended	*task_sended;//已经发送过的任务队列的头指针(只对摄像机有用)

extern struct_task_net_r	*task_r_atx;//网络读控件任务队列头指针
extern struct_task_net_w	*task_w_atx;//网络写控件任务队列头指针




/*
 * 功  能:从网络上读摄像机数据的线程
 * 参  数:msg:线程入口参数
 * 返回值:出错返回0
 */
int rc_net_r_cam(char *msg)
{
	int						res;
	int						opt;
	int						sockfd;
	int						len;
	int						rcvlen;
	int						len_opt;
	char					buf[MAX_LEN_CONNECT_BUF];
	struct_task_net_r		*tmp;
	struct_msg_con			*lnk = NULL;
	
	
	while(1)
	{
		pthread_mutex_lock(&mutex_r_cam);
		while(task_r_cam == NULL)
		{
			pthread_cond_wait(&cond_r_cam, &mutex_r_cam);
		}
		
		sockfd = task_r_cam->sockfd;
		tmp = task_r_cam;
		task_r_cam = task_r_cam->next;
		free(tmp);
		pthread_mutex_unlock(&mutex_r_cam);
		
		pthread_mutex_lock(&mutex_con_t);
		lnk = rc_get_lnk_cam(head_con, sockfd);
		if(lnk == NULL)
		{
			pthread_mutex_unlock(&mutex_con_t);
			continue;
		}
		pthread_mutex_lock(&(lnk->mutex_opt));
		pthread_mutex_unlock(&mutex_con_t);
		
		opt = -1;
		len_opt = sizeof(opt);
		res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
		if((res < 0) || (opt != 0))
		{
			goto LAB_ERROR;
		}
		
		len = sizeof(buf);
		bzero(buf, len);
		if(lnk->rcv.size != 0)
		{
			res = lnk->rcv.size - lnk->rcv.len;
			if(res <= len);
				len = res;
		}
		
		rcvlen = recv(sockfd, buf, len, MSG_DONTWAIT);
		if(rcvlen < 0)
		{
			lnk->errors++;
			if(lnk->errors > MAX_TIMES_ERROR_RCV)
				goto LAB_ERROR;
			goto LAB_NEXT;
		}
		else if(rcvlen == 0)
		{
			printf("\t ====== %s %d Cam : %s:%s exit! ======\n", __FILE__, __LINE__, inet_ntoa(lnk->address.sin_addr), lnk->serial);
			goto LAB_ERROR;
		}
		
		lnk->lasttime = rc_sec_now();
		
		if(lnk->rcv.size == 0)
		{
			lnk->rcv.len = 0;
			if(rcvlen >= LEN_LEN_PACKET_FATHER)
			{
				memcpy(&(lnk->rcv.size), buf, LEN_LEN_PACKET_FATHER);
				lnk->rcv.size += LEN_LEN_PACKET_FATHER;
				lnk->rcv.buf = malloc(lnk->rcv.size);
				if(lnk->rcv.buf == NULL)
				{
					goto LAB_NEXT;
				}
				bzero(lnk->rcv.buf, lnk->rcv.size);
			}
			else
			{
				lnk->rcv.buf = malloc(MAX_LEN_CONNECT_BUF);
				if(lnk->rcv.buf == NULL)
				{
					goto LAB_NEXT;
				}
				bzero(lnk->rcv.buf, MAX_LEN_CONNECT_BUF);
			}
		}
		
		len = rcvlen + lnk->rcv.len;
		if(len <= lnk->rcv.size)
			len_opt = rcvlen;
		else
			len_opt = lnk->rcv.size - lnk->rcv.len;
		memcpy(lnk->rcv.buf + lnk->rcv.len, buf, len_opt);
		lnk->rcv.len += len_opt;
		
		/* 
		 * 这个包的数据接收已经完成
		 * 开始处理接收到的数据
		 */
		if(lnk->rcv.len >= lnk->rcv.size)
		{
			res = rc_deal_data_net(lnk->rcv.buf, lnk->rcv.size, lnk);
			free((lnk->rcv.buf));
			lnk->rcv.buf = NULL;
			lnk->rcv.len = 0;
			lnk->rcv.size = 0;
			if(res < 0)
			{
				goto LAB_ERROR;
			}
		}
		
LAB_NEXT:
		if(link != NULL)
		{
			pthread_mutex_unlock(&(lnk->mutex_opt));
		}
		continue;
LAB_ERROR:
		if(link != NULL)
		{
			printf("\t ====== %s %d Cam : %s:%s exit! ======\n", __FILE__, __LINE__, inet_ntoa(lnk->address.sin_addr), lnk->serial);
			pthread_mutex_unlock(&(lnk->mutex_opt));
		}
		rc_fd_del(sockfd, epfd_cam);
		
		/* 在这里也要将这个sockfd的发送任务从队列中删除 */
		pthread_mutex_lock(&mutex_w_cam);
		rc_del_task_w_one(&task_w_cam, sockfd);
		pthread_mutex_unlock(&mutex_w_cam);
		
		pthread_mutex_lock(&mutex_con_t);
		rc_del_connection_one(&head_con, sockfd);//删除链接的时候会将socket关闭
		pthread_mutex_unlock(&mutex_con_t);
	}
	
	pthread_exit(NULL);
	return 0;
}


/*
 * 功  能:向网络上写摄像机数据的线程
 * 参  数:msg:线程入口参数
 * 返回值:出错返回0
 */
int rc_net_w_cam(char *msg)
{
	int						res;
	int						sockfd;
	int						opt;
	int						len_opt;
	int						len_snd;
	struct_task_net_w		*tmp;
	
	
	while(1)
	{
		pthread_mutex_lock(&mutex_w_cam);
		tmp = task_w_cam;
		while(1)
		{
			if(tmp == NULL)
			{
				while(1)
				{
					pthread_cond_wait(&cond_w_cam, &mutex_w_cam);
					
					tmp = task_w_cam;
					if(tmp != NULL)
						break;
				}
			}
			res = pthread_mutex_trylock(&(tmp->mutex_opt));
			if(res == 0)
			{
				break;
			}
			else
			{
				tmp = tmp->next;
			}
		}
		pthread_mutex_unlock(&mutex_w_cam);
		
		opt = -1;
		len_opt = sizeof(opt);
		sockfd = tmp->sockfd;
		res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
		if((res < 0) || (opt != 0))
		{
			/* socket出错或者已经被关闭*/
			
			goto LAB_ERROR;
		}
		len_snd = send(sockfd, (tmp->buf->data.buf + tmp->buf->data.len), (tmp->buf->data.size - tmp->buf->data.len), MSG_DONTWAIT);
	//	printf("\t\t ++++++ %s %d send len : %d ++++++\n", __FILE__, __LINE__, len_snd);
		if(len_snd <= 0)
		{
			tmp->errors++;
			if(tmp->errors > MAX_TIMES_ERROR_SND)
			{
				/* 发送出错次数达到最大限制 */
				goto LAB_ERROR;
			}
			else
			{
				/* 没有发送成功,重新发送 */
				goto LAB_RESEND;
			}
		}
		tmp->errors = 0;
		tmp->buf->data.len += len_snd;
		if(tmp->buf->data.len >= tmp->buf->data.size)
		{
			/* 发送成功 */
	//		printf("\t\t ~~~~~~ %s %d ~~~~~~\n", __FILE__, __LINE__);
			goto LAB_END_SND;
		}
		else
		{
			/* 没有发送完成,重新发送 */
	//		printf("\t\t ~~~~~~ %s %d ~~~~~~\n", __FILE__, __LINE__);
			goto LAB_RESEND;
		}
LAB_END_SND:
		/* 处理发送结束后的动作 */
		pthread_mutex_lock(&mutex_w_cam);
		pthread_mutex_unlock(&(tmp->mutex_opt));
		rc_del_task_w_one(&task_w_cam, sockfd);
		pthread_mutex_unlock(&mutex_w_cam);
		continue;
LAB_RESEND:
		/* 处理重新发送的动作 */
		pthread_mutex_unlock(&(tmp->mutex_opt));
	//	pthread_cond_broadcast(&cond_w_cam);
		continue;
LAB_ERROR:
		/* 处理发送错误的动作 */
		pthread_mutex_lock(&mutex_w_cam);
		pthread_mutex_unlock(&(tmp->mutex_opt));
		rc_del_task_w_one(&task_w_cam, sockfd);
		pthread_mutex_unlock(&mutex_w_cam);
		
		pthread_mutex_lock(&mutex_con_t);
		rc_del_connection_one(&head_con, sockfd);//删除链接的时候会将socket关闭
		pthread_mutex_unlock(&mutex_con_t);
	}
	
	
	pthread_exit(NULL);
	return 0;
}

/*
 * 功  能:摄像机相关的主线程
 * 参  数:par:线程入口参数
 * 返回值:成功返回0,否则返回-1
 * 说  明:进程退出值范围-201~-400
  */
int rc_thread_cam(char *par)
{
	int					i;
	int					res;
	int					fd_listen;//用于监听的文件描述符
	int					fd_con;//链接上来的文件描述符
	int					nfds;
	int					len_sock;
	int					len;
	int					sockfd;
	
	struct sockaddr_in	addr_cli;
	struct epoll_event	events[MAX_EVENTS_EPOLL];
	struct epoll_event	ev;
	
	pthread_t			tid_r[NUM_PTHREAD_R_N_CAM];//用于从网络读取数据的线程id
	pthread_t			tid_w[NUM_PTHREAD_W_N_CAM];//用于向网络发送数据的线程id
	
	
	struct_par_net		net_r;//用于从网络读取数据的线程参数
	struct_par_net		net_w;//用于向网络发送数据的线程参数
	
	struct_task_net_r	*newtask;
	struct_msg_con		*con;//链接信息指针,用于添加链接信息(只对摄像机有用)
	
	/* 初始化用于读线程池的线程 */
	net_r.epfd = &epfd_cam;
	for(i = 0; i < NUM_PTHREAD_R_N_CAM; i++)
	{
		res = pthread_create(&(tid_r[i]), NULL, (void *)rc_net_r_cam, (void *)(&net_r));
		if(res != 0)
		{
			printf("%s %d Create read data from cameras thread error!\n", __FILE__, __LINE__);
			exit(-201);
		}
		pthread_detach(tid_r[i]);
	}
	
	/* 初始化用于写线程池的线程 */
	net_w.epfd = &epfd_cam;
	for(i = 0; i < NUM_PTHREAD_W_N_CAM; i++)
	{
		res = pthread_create(&(tid_w[i]), NULL, (void *)rc_net_w_cam, (void *)(&net_w));
		if(res != 0)
		{
			printf("%s %d Create write data to cameras thread error!\n", __FILE__, __LINE__);
			exit(-202);
		}
		pthread_detach(tid_w[i]);
	}
	
	/* 建立端口监听 */
	while(1)
	{
		fd_listen = rc_socket_create(port_cam);
		if(fd_listen < 0)
		{
			sleep(1);
			continue;
		}
		else
			break;
	}
	
	/* 生成用于处理accept的epoll专用的文件描述符 */
	epfd_cam = epoll_create(MAX_DESCRIPTOR_EPOLL);
	
	ev.data.fd = fd_listen;//设置与要处理的事件相关的文件描述符
	ev.events = (EPOLLIN | EPOLLET);//设置要处理的事件类型
	epoll_ctl(epfd_cam, EPOLL_CTL_ADD, fd_listen, &ev);//注册epoll事件
	
	while(1)//2007-4-24 9:15
	{
		nfds = epoll_wait(epfd_cam, events, MAX_EVENTS_EPOLL, TIMEOUT_EPOLL);//等待epoll事件的发生
		if(nfds <= 0)
		{
			continue;
		}
		
		for(i = 0; i < nfds; i++)//2007-4-24 9:16
		{
			if(events[i].data.fd == fd_listen)//2007-4-24 9:17
			{
				len_sock = sizeof(struct sockaddr_in);
				bzero(&addr_cli, sizeof(addr_cli));
				fd_con = accept(fd_listen, (struct sockaddr *)&addr_cli, &len_sock);
				if(fd_con <= 0)
				{
					continue;
				}
				
				printf("\t\t\t ************ Get connection form ip : %s ; port : %d ************\n", inet_ntoa(addr_cli.sin_addr), addr_cli.sin_port);
				
				rc_set_no_block(fd_con);
				bzero(&ev, sizeof(ev));
				ev.data.fd = fd_con;//设置用于读操作的文件描述符
				ev.events = (EPOLLIN | EPOLLET);//设置用于注册的读操作事件
				res = epoll_ctl(epfd_cam, EPOLL_CTL_ADD, fd_con, &ev);//注册ev
				if(res < 0)
				{
					close(fd_con);
					continue;
				}
				len = sizeof(struct_msg_con);
				con = malloc(len);
				if(con == NULL)
				{
					rc_fd_del(fd_con, epfd_cam);
					close(fd_con);
					continue;
				}
				bzero(con, len);
				con->sockfd = fd_con;
				con->logintime = rc_sec_now();
				con->lasttime = con->logintime;
				pthread_mutex_init(&(con->mutex_opt), NULL);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -