📄 rc_ctl_net.c
字号:
/*
* 文 件 名:rc_ctl_net.c
* 功 能:网络部分的函数
* 作 者:马云龙
* E_mail : mayunlong21@163.com
* 开始时间:2007-4-23 16:09
* 结束时间:2007-4-27 13:13
* 修改时间:
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <net/ethernet.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <signal.h>
#include <netinet/ip.h>
#include <sys/wait.h>
#include <dirent.h>
#include <errno.h>
#include <termios.h>
#include <time.h>
#include <pthread.h>
#include "rc_pub_define.h"
#include "rc_pub_type.h"
#include "rc_pub_net.h"
#include "rc_pub_time.h"
#include "rc_pub_log.h"
#include "rc_pub_epoll.h"
#include "rc_ctl_define.h"
#include "rc_ctl_table.h"
#include "rc_ctl_data.h"
#define MAX_TIMES_ERROR_SND 1000 //发送数据出错的最大次数
#define MAX_TIMES_ERROR_RCV 20 //接收数据出错的最大次数
#define LEN_PACKET_VIDEO_ASK 50 //来自控件的视频请求包的长度
extern struct_msg_con *head_con;//摄像机链接信息的结构体头指针
extern struct_msg_atx *head_atx;//控件链接信息的结构体头指针
extern pthread_mutex_t mutex_con_t;//摄像机链接链表的互斥锁
extern pthread_mutex_t mutex_con_atx;//控件链接链表的互斥锁
extern int port_atx;
extern int port_cam;
extern pthread_mutex_t mutex_r_cam;//读摄像机任务队列的互斥锁
extern pthread_mutex_t mutex_w_cam;//写摄像机任务队列的互斥锁
extern pthread_cond_t cond_r_cam;//读摄像机任务线程要使用的条件变量
extern pthread_cond_t cond_w_cam;//写摄像机任务线程要使用的条件变量
extern pthread_mutex_t mutex_r_atx;//读控件任务队列的互斥锁
extern pthread_mutex_t mutex_w_atx;//写控件任务队列的互斥锁
extern pthread_cond_t cond_r_atx;//读摄像机任务线程要使用的条件变量
extern pthread_cond_t cond_w_atx;//写摄像机任务线程要使用的条件变量
extern int epfd_cam;
extern int epfd_atx;
extern struct_task_net_r *task_r_cam;//网络读摄像机任务队列头指针
extern struct_task_net_w *task_w_cam;//网络写摄像机任务队列头指针
extern struct_task_sended *task_sended;//已经发送过的任务队列的头指针(只对摄像机有用)
extern struct_task_net_r *task_r_atx;//网络读控件任务队列头指针
extern struct_task_net_w *task_w_atx;//网络写控件任务队列头指针
/*
* 功 能:从网络上读摄像机数据的线程
* 参 数:msg:线程入口参数
* 返回值:出错返回0
*/
int rc_net_r_cam(char *msg)
{
int res;
int opt;
int sockfd;
int len;
int rcvlen;
int len_opt;
char buf[MAX_LEN_CONNECT_BUF];
struct_task_net_r *tmp;
struct_msg_con *lnk = NULL;
while(1)
{
pthread_mutex_lock(&mutex_r_cam);
while(task_r_cam == NULL)
{
pthread_cond_wait(&cond_r_cam, &mutex_r_cam);
}
sockfd = task_r_cam->sockfd;
tmp = task_r_cam;
task_r_cam = task_r_cam->next;
free(tmp);
pthread_mutex_unlock(&mutex_r_cam);
pthread_mutex_lock(&mutex_con_t);
lnk = rc_get_lnk_cam(head_con, sockfd);
if(lnk == NULL)
{
pthread_mutex_unlock(&mutex_con_t);
continue;
}
pthread_mutex_lock(&(lnk->mutex_opt));
pthread_mutex_unlock(&mutex_con_t);
opt = -1;
len_opt = sizeof(opt);
res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
if((res < 0) || (opt != 0))
{
goto LAB_ERROR;
}
len = sizeof(buf);
bzero(buf, len);
if(lnk->rcv.size != 0)
{
res = lnk->rcv.size - lnk->rcv.len;
if(res <= len);
len = res;
}
rcvlen = recv(sockfd, buf, len, MSG_DONTWAIT);
if(rcvlen < 0)
{
lnk->errors++;
if(lnk->errors > MAX_TIMES_ERROR_RCV)
goto LAB_ERROR;
goto LAB_NEXT;
}
else if(rcvlen == 0)
{
printf("\t ====== %s %d Cam : %s:%s exit! ======\n", __FILE__, __LINE__, inet_ntoa(lnk->address.sin_addr), lnk->serial);
goto LAB_ERROR;
}
lnk->lasttime = rc_sec_now();
if(lnk->rcv.size == 0)
{
lnk->rcv.len = 0;
if(rcvlen >= LEN_LEN_PACKET_FATHER)
{
memcpy(&(lnk->rcv.size), buf, LEN_LEN_PACKET_FATHER);
lnk->rcv.size += LEN_LEN_PACKET_FATHER;
lnk->rcv.buf = malloc(lnk->rcv.size);
if(lnk->rcv.buf == NULL)
{
goto LAB_NEXT;
}
bzero(lnk->rcv.buf, lnk->rcv.size);
}
else
{
lnk->rcv.buf = malloc(MAX_LEN_CONNECT_BUF);
if(lnk->rcv.buf == NULL)
{
goto LAB_NEXT;
}
bzero(lnk->rcv.buf, MAX_LEN_CONNECT_BUF);
}
}
len = rcvlen + lnk->rcv.len;
if(len <= lnk->rcv.size)
len_opt = rcvlen;
else
len_opt = lnk->rcv.size - lnk->rcv.len;
memcpy(lnk->rcv.buf + lnk->rcv.len, buf, len_opt);
lnk->rcv.len += len_opt;
/*
* 这个包的数据接收已经完成
* 开始处理接收到的数据
*/
if(lnk->rcv.len >= lnk->rcv.size)
{
res = rc_deal_data_net(lnk->rcv.buf, lnk->rcv.size, lnk);
free((lnk->rcv.buf));
lnk->rcv.buf = NULL;
lnk->rcv.len = 0;
lnk->rcv.size = 0;
if(res < 0)
{
goto LAB_ERROR;
}
}
LAB_NEXT:
if(link != NULL)
{
pthread_mutex_unlock(&(lnk->mutex_opt));
}
continue;
LAB_ERROR:
if(link != NULL)
{
printf("\t ====== %s %d Cam : %s:%s exit! ======\n", __FILE__, __LINE__, inet_ntoa(lnk->address.sin_addr), lnk->serial);
pthread_mutex_unlock(&(lnk->mutex_opt));
}
rc_fd_del(sockfd, epfd_cam);
/* 在这里也要将这个sockfd的发送任务从队列中删除 */
pthread_mutex_lock(&mutex_w_cam);
rc_del_task_w_one(&task_w_cam, sockfd);
pthread_mutex_unlock(&mutex_w_cam);
pthread_mutex_lock(&mutex_con_t);
rc_del_connection_one(&head_con, sockfd);//删除链接的时候会将socket关闭
pthread_mutex_unlock(&mutex_con_t);
}
pthread_exit(NULL);
return 0;
}
/*
* 功 能:向网络上写摄像机数据的线程
* 参 数:msg:线程入口参数
* 返回值:出错返回0
*/
int rc_net_w_cam(char *msg)
{
int res;
int sockfd;
int opt;
int len_opt;
int len_snd;
struct_task_net_w *tmp;
while(1)
{
pthread_mutex_lock(&mutex_w_cam);
tmp = task_w_cam;
while(1)
{
if(tmp == NULL)
{
while(1)
{
pthread_cond_wait(&cond_w_cam, &mutex_w_cam);
tmp = task_w_cam;
if(tmp != NULL)
break;
}
}
res = pthread_mutex_trylock(&(tmp->mutex_opt));
if(res == 0)
{
break;
}
else
{
tmp = tmp->next;
}
}
pthread_mutex_unlock(&mutex_w_cam);
opt = -1;
len_opt = sizeof(opt);
sockfd = tmp->sockfd;
res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
if((res < 0) || (opt != 0))
{
/* socket出错或者已经被关闭*/
goto LAB_ERROR;
}
len_snd = send(sockfd, (tmp->buf->data.buf + tmp->buf->data.len), (tmp->buf->data.size - tmp->buf->data.len), MSG_DONTWAIT);
// printf("\t\t ++++++ %s %d send len : %d ++++++\n", __FILE__, __LINE__, len_snd);
if(len_snd <= 0)
{
tmp->errors++;
if(tmp->errors > MAX_TIMES_ERROR_SND)
{
/* 发送出错次数达到最大限制 */
goto LAB_ERROR;
}
else
{
/* 没有发送成功,重新发送 */
goto LAB_RESEND;
}
}
tmp->errors = 0;
tmp->buf->data.len += len_snd;
if(tmp->buf->data.len >= tmp->buf->data.size)
{
/* 发送成功 */
// printf("\t\t ~~~~~~ %s %d ~~~~~~\n", __FILE__, __LINE__);
goto LAB_END_SND;
}
else
{
/* 没有发送完成,重新发送 */
// printf("\t\t ~~~~~~ %s %d ~~~~~~\n", __FILE__, __LINE__);
goto LAB_RESEND;
}
LAB_END_SND:
/* 处理发送结束后的动作 */
pthread_mutex_lock(&mutex_w_cam);
pthread_mutex_unlock(&(tmp->mutex_opt));
rc_del_task_w_one(&task_w_cam, sockfd);
pthread_mutex_unlock(&mutex_w_cam);
continue;
LAB_RESEND:
/* 处理重新发送的动作 */
pthread_mutex_unlock(&(tmp->mutex_opt));
// pthread_cond_broadcast(&cond_w_cam);
continue;
LAB_ERROR:
/* 处理发送错误的动作 */
pthread_mutex_lock(&mutex_w_cam);
pthread_mutex_unlock(&(tmp->mutex_opt));
rc_del_task_w_one(&task_w_cam, sockfd);
pthread_mutex_unlock(&mutex_w_cam);
pthread_mutex_lock(&mutex_con_t);
rc_del_connection_one(&head_con, sockfd);//删除链接的时候会将socket关闭
pthread_mutex_unlock(&mutex_con_t);
}
pthread_exit(NULL);
return 0;
}
/*
* 功 能:摄像机相关的主线程
* 参 数:par:线程入口参数
* 返回值:成功返回0,否则返回-1
* 说 明:进程退出值范围-201~-400
*/
int rc_thread_cam(char *par)
{
int i;
int res;
int fd_listen;//用于监听的文件描述符
int fd_con;//链接上来的文件描述符
int nfds;
int len_sock;
int len;
int sockfd;
struct sockaddr_in addr_cli;
struct epoll_event events[MAX_EVENTS_EPOLL];
struct epoll_event ev;
pthread_t tid_r[NUM_PTHREAD_R_N_CAM];//用于从网络读取数据的线程id
pthread_t tid_w[NUM_PTHREAD_W_N_CAM];//用于向网络发送数据的线程id
struct_par_net net_r;//用于从网络读取数据的线程参数
struct_par_net net_w;//用于向网络发送数据的线程参数
struct_task_net_r *newtask;
struct_msg_con *con;//链接信息指针,用于添加链接信息(只对摄像机有用)
/* 初始化用于读线程池的线程 */
net_r.epfd = &epfd_cam;
for(i = 0; i < NUM_PTHREAD_R_N_CAM; i++)
{
res = pthread_create(&(tid_r[i]), NULL, (void *)rc_net_r_cam, (void *)(&net_r));
if(res != 0)
{
printf("%s %d Create read data from cameras thread error!\n", __FILE__, __LINE__);
exit(-201);
}
pthread_detach(tid_r[i]);
}
/* 初始化用于写线程池的线程 */
net_w.epfd = &epfd_cam;
for(i = 0; i < NUM_PTHREAD_W_N_CAM; i++)
{
res = pthread_create(&(tid_w[i]), NULL, (void *)rc_net_w_cam, (void *)(&net_w));
if(res != 0)
{
printf("%s %d Create write data to cameras thread error!\n", __FILE__, __LINE__);
exit(-202);
}
pthread_detach(tid_w[i]);
}
/* 建立端口监听 */
while(1)
{
fd_listen = rc_socket_create(port_cam);
if(fd_listen < 0)
{
sleep(1);
continue;
}
else
break;
}
/* 生成用于处理accept的epoll专用的文件描述符 */
epfd_cam = epoll_create(MAX_DESCRIPTOR_EPOLL);
ev.data.fd = fd_listen;//设置与要处理的事件相关的文件描述符
ev.events = (EPOLLIN | EPOLLET);//设置要处理的事件类型
epoll_ctl(epfd_cam, EPOLL_CTL_ADD, fd_listen, &ev);//注册epoll事件
while(1)//2007-4-24 9:15
{
nfds = epoll_wait(epfd_cam, events, MAX_EVENTS_EPOLL, TIMEOUT_EPOLL);//等待epoll事件的发生
if(nfds <= 0)
{
continue;
}
for(i = 0; i < nfds; i++)//2007-4-24 9:16
{
if(events[i].data.fd == fd_listen)//2007-4-24 9:17
{
len_sock = sizeof(struct sockaddr_in);
bzero(&addr_cli, sizeof(addr_cli));
fd_con = accept(fd_listen, (struct sockaddr *)&addr_cli, &len_sock);
if(fd_con <= 0)
{
continue;
}
printf("\t\t\t ************ Get connection form ip : %s ; port : %d ************\n", inet_ntoa(addr_cli.sin_addr), addr_cli.sin_port);
rc_set_no_block(fd_con);
bzero(&ev, sizeof(ev));
ev.data.fd = fd_con;//设置用于读操作的文件描述符
ev.events = (EPOLLIN | EPOLLET);//设置用于注册的读操作事件
res = epoll_ctl(epfd_cam, EPOLL_CTL_ADD, fd_con, &ev);//注册ev
if(res < 0)
{
close(fd_con);
continue;
}
len = sizeof(struct_msg_con);
con = malloc(len);
if(con == NULL)
{
rc_fd_del(fd_con, epfd_cam);
close(fd_con);
continue;
}
bzero(con, len);
con->sockfd = fd_con;
con->logintime = rc_sec_now();
con->lasttime = con->logintime;
pthread_mutex_init(&(con->mutex_opt), NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -