📄 service.c
字号:
/* * 进程预分配、线程池函数库,由于用到了全局变量,只能做成静态库。 * * start_server(struct server_info *ser_info, int (*handler)(int fd)); * * struct server_info *结构体包含端口,最大并发数等一些信息,调用改函数 * 时,该结构体各项应指定好。 * handler是一个函数指针,该函数处理具体应用协议,如smtp、pop3等。 * * 注意:当ser_info->thread_num > 1即工作在线程模式下,handler必须是线程 * 安全的。 */#include <stdio.h>#include <stdlib.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/wait.h>#include <sys/poll.h>#include <netinet/in.h>#include <string.h>#include <signal.h>#include <pthread.h>#include <semaphore.h>#include <errno.h>#ifdef FREEBSD#include <machine/param.h>#endif#include <sys/ipc.h>#include <sys/shm.h>#include "network.h"#include "service.h"#define GINT(x, y) ((int)((x)/(y)) + ((x)%(y)?1:0))#define GMOD(x, y) ((int)((x)%(y)) - ((x)%(y)?1:0)) * yint thread_num;int max_child;struct server_info *p_ser_info;struct handler_info *p_han_info;int listen_fd;struct ipc_mem *ptr;sem_t sem_idle;int in, out;int cfd_array[MAX_THREAD]; pthread_mutex_t mutex_cfd = PTHREAD_MUTEX_INITIALIZER;;pthread_cond_t cond_cfd = PTHREAD_COND_INITIALIZER;int num = 0;pthread_mutex_t mutex_num = PTHREAD_MUTEX_INITIALIZER;;pthread_cond_t cond_num = PTHREAD_COND_INITIALIZER;/* * 设置子进程的信号处理函数。 */void child_sigexit_handler(int sig){ exit(0);}void child_sigusr1_handler(int sig){ exit(0);}/* * 子进程信号处理函数 */void child_set_signal(){ struct sigaction act; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = child_sigexit_handler; sigaction(SIGILL, &act, NULL); sigaction(SIGABRT, &act, NULL); sigaction(SIGBUS, &act, NULL); sigaction(SIGFPE, &act, NULL); sigaction(SIGSEGV, &act, NULL); sigaction(SIGPIPE, &act, NULL); sigaction(SIGTERM, &act, NULL); act.sa_handler = child_sigusr1_handler; sigaction(SIGUSR1, &act, NULL);}/* * 进程模式。 */int child(){ int fd; int cfd; int request; struct pollfd pd; request = 0; fd = udp_connect("localhost", p_ser_info->port); while (1) { pd.fd = POLLIN; if (poll(&pd, 1, 5000) < 0) continue; sem_wait(&(ptr->sem)); printf("ptr->num = %d ptr->idle = %d\n", ptr->num, ptr->idle); if (ptr->idle > MAX_IDLE) { ptr->num--; ptr->idle--; sem_post(&(ptr->sem)); break; } sem_post(&(ptr->sem)); if ((pd.revents & POLLIN) == 0) continue; cfd = accept(listen_fd, NULL, NULL); if (cfd < 0) continue; sem_wait(&(ptr->sem)); ptr->idle--; if (ptr->idle < MIN_IDLE) sem_post(&(ptr->sem_notify)); sem_post(&(ptr->sem)); p_han_info->main_handler(cfd); close(cfd); sem_wait(&(ptr->sem)); ptr->idle++; sem_post(&(ptr->sem)); request++; if (request >= MAX_REQUEST) { sem_wait(&(ptr->sem)); ptr->num--; ptr->idle--; if (ptr->idle < MIN_IDLE) sem_post(&(ptr->sem_notify)); sem_post(&(ptr->sem)); break; } } exit(0); return 0;}/* * 服务线程。 */void *sleep_pth(void *args){ int cfd; while (1) { pthread_mutex_lock(&mutex_cfd); // 队列中已有未处理的连接时,直接处理,无需pthread_cond_wait。 while (cfd_array[out] == 0) { pthread_cond_wait(&cond_cfd, &mutex_cfd); } cfd = cfd_array[out]; cfd_array[out] = 0; out = (out + 1) % thread_num; pthread_mutex_unlock(&mutex_cfd); p_han_info->main_handler(cfd); close(cfd); sem_wait(&(ptr->sem)); ptr->idle++; sem_post(&(ptr->sem)); sem_post(&sem_idle); } return NULL;}/* * 监控进程。 */int listen_pth(){ int fd; int cfd; int i; int request; struct pollfd pd; request = 0; fd = udp_connect("localhost", p_ser_info->port); while (1) { // 无空闲线程时,阻塞在这里。 sem_wait(&sem_idle); pd.fd = listen_fd; pd.events = POLLIN; // 会被父进程发送的SIGTERM信号中断。 if (poll(&pd, 1, 5000) <= 0) { sem_post(&sem_idle); continue; } sem_wait(&(ptr->sem)); printf("ptr->num = %d ptr->idle = %d\n", ptr->num, ptr->idle); if (ptr->idle > MAX_IDLE * thread_num) { ptr->num --; ptr->idle -= thread_num; sem_post(&(ptr->sem)); sem_post(&sem_idle); break; } sem_post(&(ptr->sem)); if ((pd.revents & POLLIN) == 0) { sem_post(&sem_idle); continue; } cfd = accept(listen_fd, NULL, NULL); if (cfd < 0) { sem_post(&sem_idle); continue; } sem_wait(&(ptr->sem)); ptr->idle--; if (ptr->idle < MIN_IDLE) sem_post(&(ptr->sem_notify)); sem_post(&(ptr->sem)); // 唤醒一个空闲线程。 pthread_mutex_lock(&mutex_cfd); cfd_array[in] = cfd; in = (in + 1) % thread_num; pthread_mutex_unlock(&mutex_cfd); // 服务线程不在pthread_cond_wait时,此处唤醒信号被丢失。 pthread_cond_signal(&cond_cfd); request++; if (request >= MAX_REQUEST) { sem_wait(&(ptr->sem)); ptr->num --; ptr->idle -= thread_num; if (ptr->idle < MIN_IDLE) sem_post(&(ptr->sem_notify)); sem_post(&(ptr->sem)); break; } } // 有线程正在服务时,阻塞在这里。 for (i = 0; i < thread_num; i++) { sem_wait(&sem_idle); } return 0;}/* * 创建线程池 */int pth_func(){ int i; int ret; pthread_t id; sem_init(&sem_idle, 0, thread_num); for (i = 0; i < thread_num; i++) { ret = pthread_create(&id, NULL, sleep_pth, NULL); if (0 != ret) { continue; } } // 主线程监听 listen_pth(); exit(0); return 0; }/* * 创建子进程,返回创建失败的进程个数。 */int create_child(int num){ int i; pid_t pid; for (i = 0; num > 0; i++) { sem_wait(&(ptr->sem)); if (ptr->num >= max_child) { sem_post(&(ptr->sem)); break; } ptr->num++; ptr->idle += thread_num; sem_post(&(ptr->sem)); pid = fork(); if (pid < 0) { sem_wait(&(ptr->sem)); ptr->num--; ptr->idle -= thread_num; sem_post(&(ptr->sem)); return num; } if (pid == 0) { child_set_signal(); if (thread_num > 1) { pth_func(); } else { child(); } } else { num--; } } return num;}/* * 向子进程发送SIGUSR1号,杀子进程。 */void sigexit_handler(int sig){ printf("pid = %d receive sig = %d", getpid(), sig); close(listen_fd); kill(0, SIGUSR1); if (U_DOMAIN == p_ser_info->type || BOTH == p_ser_info->type) unlink(p_ser_info->path); if (p_han_info->exit_handler) p_han_info->exit_handler(); exit(0);}/* * 回收子进程。 */void sigchld_handler(int sig){ int status; pid_t pid; // 多个子进程退出,可能只收到1个SIGCHLD信号。 while ((pid = waitpid(-1, &status, WNOHANG) > 0)) { }}/* * 设置prefork主进程的信号处理函数。 */void set_signal(){ struct sigaction act; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = sigexit_handler; sigaction(SIGILL, &act, NULL); sigaction(SIGABRT, &act, NULL); sigaction(SIGBUS, &act, NULL); sigaction(SIGFPE, &act, NULL); sigaction(SIGSEGV, &act, NULL); sigaction(SIGTERM, &act, NULL); sigaction(SIGQUIT, &act, NULL); sigaction(SIGINT, &act, NULL); act.sa_handler = sigchld_handler; sigaction(SIGCHLD, &act, NULL);}/* * 分配共享内存(进程表) */int set_ipc(){ int shmid; shmid = shmget(IPC_PRIVATE, sizeof(struct ipc_mem), 0600 | IPC_CREAT); if (shmid < 0) { return -1; } ptr = shmat(shmid, NULL, 0); if (ptr == NULL) { return -1; } memset(ptr, 0, sizeof(struct ipc_mem)); sem_init(&(ptr->sem), 1, 1); sem_init(&(ptr->sem_notify), 1, 0); shmctl(shmid, IPC_RMID, 0); // 这样所有的进程退出后将自动清空 return 0;}/* * 启动预分配、线程池网络服务。 */int start_server(struct server_info *ser_info, struct handler_info *han_info){ int idle; set_signal(); p_han_info = han_info; p_ser_info = ser_info; if (p_han_info->init_handler) p_han_info->init_handler(); if (I_NET == ser_info->type) { listen_fd = tcp_listen(ser_info->port, 500, NULL); } else if (U_DOMAIN == ser_info->type) { listen_fd = stream_listen(ser_info->path, 500); } else if (BOTH == ser_info->type) { printf("unimpliment type\n"); return -1; } if (-1 == listen_fd) return -1; thread_num = ser_info->thread_num; if (thread_num > MAX_THREAD) thread_num = MAX_THREAD; if (thread_num < 0) thread_num = 1; if (thread_num > 1) { max_child = GINT(ser_info->max_service, thread_num); } else { max_child = ser_info->max_service; } if (max_child > MAX_PROC) max_child = MAX_PROC; set_ipc(); create_child(MIN_IDLE); while (1) { sem_wait(&(ptr->sem_notify)); sem_wait(&(ptr->sem)); idle = ptr->idle; sem_post(&(ptr->sem)); if (idle < MIN_IDLE * thread_num) create_child(MIN_IDLE); } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -