📄 rc_ctl_net.c
字号:
memcpy(&(con->address), &addr_cli, sizeof(struct sockaddr_in));
pthread_mutex_lock(&mutex_con_t);
rc_add_connection(&head_con, con);
pthread_mutex_unlock(&mutex_con_t);
}//2007-4-24 9:17
else if(events[i].events & EPOLLIN)//2007-4-24 9:48
{
sockfd = events[i].data.fd;
if(sockfd < 0)
continue;
len = sizeof(struct_task_net_r);
newtask = malloc(len);
if(newtask == NULL)
continue;
bzero(newtask, len);
newtask->sockfd = sockfd;
pthread_mutex_lock(&mutex_r_cam);
rc_add_task_r(&task_r_cam, newtask);
/* 唤醒所有等待这个条件的线程 */
pthread_mutex_unlock(&mutex_r_cam);
pthread_cond_broadcast(&cond_r_cam);
}//2007-4-24 9:48
else if(events[i].events & EPOLLOUT)
{
}
}//2007-4-24 9:16
}//2007-4-24 9:15
pthread_exit(NULL);
return 0;
}
/*
* 功 能:从网络上读控件数据的线程
* 参 数:msg:线程入口参数
* 返回值:出错返回0
*/
int rc_net_r_atx(char *msg)
{
int res;
int opt;
int sockfd;
int len;
int rcvlen;
int len_opt;
char buf[MAX_LEN_CONNECT_BUF];
struct_task_net_r *tmp;
struct_msg_con con;
struct_msg_atx *lnk;
while(1)
{
pthread_mutex_lock(&mutex_r_atx);
while(task_r_atx == NULL)
{
pthread_cond_wait(&cond_r_atx, &mutex_r_atx);
}
sockfd = task_r_atx->sockfd;
tmp = task_r_atx;
task_r_atx = task_r_atx->next;
free(tmp);
pthread_mutex_unlock(&mutex_r_atx);
pthread_mutex_lock(&mutex_con_atx);
lnk = rc_get_lnk_atx(head_atx, sockfd);
if(lnk == NULL)
{
pthread_mutex_unlock(&mutex_con_atx);
continue;
}
pthread_mutex_lock(&(lnk->mutex_opt));
pthread_mutex_unlock(&mutex_con_atx);
opt = -1;
len_opt = sizeof(opt);
res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
if((res < 0) || (opt != 0))
{
printf("****** %s %d res : %d ******\n", __FILE__, __LINE__, res);
goto LAB_ERROR;
}
len = sizeof(buf);
bzero(buf, len);
if(lnk->rcv.size != 0)
{
res = lnk->rcv.size - lnk->rcv.len;
if(res <= len);
len = res;
}
rcvlen = recv(sockfd, buf, len, MSG_DONTWAIT);
if(rcvlen < 0)
{
lnk->errors++;
if(lnk->errors > MAX_TIMES_ERROR_RCV)
{
goto LAB_ERROR;
}
goto LAB_NEXT;
}
else if(rcvlen == 0)
{
goto LAB_ERROR;
}
lnk->lasttime = rc_sec_now();
if(lnk->rcv.size == 0)
{
lnk->rcv.len = 0;
if(rcvlen >= LEN_LEN_PACKET_FATHER)
{
memcpy(&(lnk->rcv.size), buf, LEN_LEN_PACKET_FATHER);
lnk->rcv.size += LEN_LEN_PACKET_FATHER;
lnk->rcv.buf = malloc(lnk->rcv.size);
if(lnk->rcv.buf == NULL)
{
goto LAB_NEXT;
}
bzero(lnk->rcv.buf, lnk->rcv.size);
}
else
{
lnk->rcv.buf = malloc(MAX_LEN_CONNECT_BUF);
if(lnk->rcv.buf == NULL)
{
goto LAB_NEXT;
}
bzero(lnk->rcv.buf, MAX_LEN_CONNECT_BUF);
}
}
len = rcvlen + lnk->rcv.len;
if(len <= lnk->rcv.size)
len_opt = rcvlen;
else
len_opt = lnk->rcv.size - lnk->rcv.len;
memcpy(lnk->rcv.buf + lnk->rcv.len, buf, len_opt);
lnk->rcv.len += len_opt;
/*
* 这个包的数据接收已经完成
* 开始处理接收到的数据
*/
if(lnk->rcv.len >= lnk->rcv.size)
{
bzero(&con, sizeof(struct_msg_con));
con.sockfd = sockfd;
memcpy(&(con.address), &(lnk->address), sizeof(struct sockaddr_in));
if(strlen(lnk->serial) == LEN_SERIAL_CHECK)
strcpy(con.serial, lnk->serial);
// printf("%s %d\n", __FILE__, __LINE__);
res = rc_deal_data_net(lnk->rcv.buf, lnk->rcv.size, &con);
free((lnk->rcv.buf));
lnk->rcv.buf = NULL;
lnk->rcv.len = 0;
lnk->rcv.size = 0;
if(res < 0)
{
goto LAB_ERROR;
}
if(strlen(lnk->serial) != LEN_SERIAL_CHECK)
{
bzero(lnk->serial, sizeof(lnk->serial));
strcpy(lnk->serial, con.serial);
}
}
LAB_NEXT:
if(lnk != NULL)
pthread_mutex_unlock(&(lnk->mutex_opt));
continue;
LAB_ERROR:
if(lnk != NULL)
pthread_mutex_unlock(&(lnk->mutex_opt));
rc_fd_del(sockfd, epfd_atx);
/* 在这里也要将这个sockfd的发送任务从队列中删除 */
pthread_mutex_lock(&mutex_w_atx);
rc_del_task_w_one(&task_w_atx, sockfd);
pthread_mutex_unlock(&mutex_w_atx);
pthread_mutex_lock(&mutex_con_atx);
rc_del_atx_one(&head_atx, sockfd);
pthread_mutex_unlock(&mutex_con_atx);
}
pthread_exit(NULL);
return 0;
}
/*
* 功 能:向网络上写控件数据的线程
* 参 数:msg:线程入口参数
* 返回值:出错返回0
*/
int rc_net_w_atx(char *msg)
{
int res;
int sockfd;
int opt;
int len_opt;
int len_snd;
struct_task_net_w *tmp;
while(1)
{
pthread_mutex_lock(&mutex_w_atx);
tmp = task_w_atx;
while(1)
{
if(tmp == NULL)
{
while(1)
{
pthread_cond_wait(&cond_w_atx, &mutex_w_atx);
tmp = task_w_atx;
if(tmp != NULL)
break;
}
}
res = pthread_mutex_trylock(&(tmp->mutex_opt));
if(res == 0)
{
break;
}
else
{
tmp = tmp->next;
}
}
pthread_mutex_unlock(&mutex_w_atx);
opt = -1;
len_opt = sizeof(opt);
sockfd = tmp->sockfd;
res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&opt, &len_opt);
if((res < 0) || (opt != 0))
{
/* socket出错或者已经被关闭*/
goto LAB_END_SND;
}
len_snd = send(sockfd, (tmp->buf->data.buf + tmp->buf->data.len), (tmp->buf->data.size - tmp->buf->data.len), MSG_DONTWAIT);
if(len_snd <= 0)
{
goto LAB_RESEND;
tmp->errors++;
if(tmp->errors > MAX_TIMES_ERROR_SND + 100)
{
/* 发送出错次数达到最大限制 */
goto LAB_END_SND;
}
else
{
/* 没有发送成功,重新发送 */
goto LAB_RESEND;
}
}
tmp->errors = 0;
tmp->buf->data.len += len_snd;
if(tmp->buf->data.len >= tmp->buf->data.size)
{
/* 发送成功 */
// goto LAB_END_SND;
goto LAB_NEXT;
}
else
{
/* 没有发送完成,重新发送 */
goto LAB_RESEND;
}
LAB_NEXT:
pthread_mutex_unlock(&(tmp->mutex_opt));
pthread_mutex_unlock(&(tmp->mutex_opt));
rc_del_task_w_one(&task_w_atx, sockfd);
pthread_mutex_unlock(&mutex_w_atx);
continue;
LAB_RESEND:
pthread_mutex_unlock(&(tmp->mutex_opt));
// pthread_cond_broadcast(&cond_w_atx);
continue;
LAB_END_SND:
/* 处理发送结束后的动作 */
// printf("%s %d\n", __FILE__, __LINE__);
rc_fd_del(sockfd, epfd_atx);
pthread_mutex_lock(&mutex_w_atx);
pthread_mutex_unlock(&(tmp->mutex_opt));
rc_del_task_w_one(&task_w_atx, sockfd);
pthread_mutex_unlock(&mutex_w_atx);
pthread_mutex_lock(&mutex_con_atx);
rc_del_atx_one(&head_atx, sockfd);
pthread_mutex_unlock(&mutex_con_atx);
}
pthread_exit(NULL);
return 0;
}
/*
* 功 能:控件相关的主线程
* 参 数:par:线程入口参数
* 返回值:成功返回0,否则返回-1
* 说 明:进程退出值范围-401~-600
*/
int rc_thread_atx(char *par)
{
int i;
int res;
int fd_listen;//用于监听的文件描述符
int fd_con;//链接上来的文件描述符
int nfds;
int len_sock;
int len;
int sockfd;
pthread_t tid_r[NUM_PTHREAD_R_N_ATX];//用于从网络读取数据的线程id
pthread_t tid_w[NUM_PTHREAD_W_N_ATX];//用于向网络发送数据的线程id
struct sockaddr_in addr_cli;
struct epoll_event events[MAX_EVENTS_EPOLL];
struct epoll_event ev;
struct_par_net net_r;//用于从网络读取数据的线程参数
struct_par_net net_w;//用于向网络发送数据的线程参数
struct_task_net_r *newtask;
struct_msg_atx *atx;//链接信息指针,用于添加控件链接信息
/* 初始化用于读线程池的线程 */
net_r.epfd = &epfd_cam;
for(i = 0; i < NUM_PTHREAD_R_N_ATX; i++)
{
res = pthread_create(&(tid_r[i]), NULL, (void *)rc_net_r_atx, (void *)(&net_r));
if(res != 0)
{
printf("%s %d Create read data from atxs thread error!\n", __FILE__, __LINE__);
exit(-401);
}
pthread_detach(tid_r[i]);
}
/* 初始化用于写线程池的线程 */
net_w.epfd = &epfd_cam;
for(i = 0; i < NUM_PTHREAD_W_N_ATX; i++)
{
res = pthread_create(&(tid_w[i]), NULL, (void *)rc_net_w_atx, (void *)(&net_w));
if(res != 0)
{
printf("%s %d Create write data to atxs thread error!\n", __FILE__, __LINE__);
exit(-402);
}
pthread_detach(tid_w[i]);
}
/* 建立端口监听 */
while(1)
{
fd_listen = rc_socket_create(port_atx);
if(fd_listen < 0)
{
sleep(1);
continue;
}
else
break;
}
/* 生成用于处理accept的epoll专用的文件描述符 */
epfd_atx = epoll_create(MAX_DESCRIPTOR_EPOLL);
ev.data.fd = fd_listen;//设置与要处理的事件相关的文件描述符
ev.events = (EPOLLIN | EPOLLET);//设置要处理的事件类型
epoll_ctl(epfd_atx, EPOLL_CTL_ADD, fd_listen, &ev);//注册epoll事件
while(1)//2007-4-24 11:02
{//TIMEOUT_EPOLL
nfds = epoll_wait(epfd_atx, events, MAX_EVENTS_EPOLL, TIMEOUT_EPOLL);//等待epoll事件
if(nfds <= 0)
{
continue;
}
for(i = 0; i < nfds; i++)//2007-4-24 11:06
{
if(events[i].data.fd == fd_listen)//2007-4-24 11:07
{
len_sock = sizeof(struct sockaddr_in);
bzero(&addr_cli, sizeof(addr_cli));
fd_con = accept(fd_listen, (struct sockaddr *)&addr_cli, &len_sock);
if(fd_con <= 0)
{
printf("%s %d fd_con : %d ; error : %s", __FILE__, __LINE__, fd_con, strerror(errno));
continue;
}
printf("\t\t\t ************ Get connection form ip : %s ; port : %d ************\n", inet_ntoa(addr_cli.sin_addr), addr_cli.sin_port);
rc_set_no_block(fd_con);
bzero(&ev, sizeof(ev));
ev.data.fd = fd_con;//设置用于读操作的文件描述符
ev.events = (EPOLLIN | EPOLLET);//设置用于注册的读操作事件
res = epoll_ctl(epfd_atx, EPOLL_CTL_ADD, fd_con, &ev);//注册ev
if(res < 0)
{
close(fd_con);
continue;
}
len = sizeof(struct_msg_atx);
atx = malloc(len);
if(atx == NULL)
{
rc_fd_del(fd_con, epfd_atx);
close(fd_con);
continue;
}
bzero(atx, len);
atx->sockfd = fd_con;
atx->lasttime = rc_sec_now();
pthread_mutex_init(&(atx->mutex_opt), NULL);
memcpy(&(atx->address), &addr_cli, sizeof(struct sockaddr_in));
pthread_mutex_lock(&mutex_con_atx);
rc_add_atx(&head_atx, atx);
pthread_mutex_unlock(&mutex_con_atx);
}//2007-4-24 11:07
else if(events[i].events & EPOLLIN)
{
sockfd = events[i].data.fd;
if(sockfd < 0)
continue;
len = sizeof(struct_task_net_r);
newtask = malloc(len);
if(newtask == NULL)
continue;
bzero(newtask, len);
newtask->sockfd = sockfd;
pthread_mutex_lock(&mutex_r_atx);
rc_add_task_r(&task_r_atx, newtask);
/* 唤醒所有等待这个条件的线程 */
pthread_mutex_unlock(&mutex_r_atx);
pthread_cond_broadcast(&cond_r_atx);
}
else if(events[i].events & EPOLLOUT)
{
}
}//2007-4-24 11:06
}//2007-4-24 11:02
pthread_exit(NULL);
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -