⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 service.c

📁 类似apache2.0的多线程技术,目的在于解决网络服务器在并发客户数很大的情况下服务器进程分配(fork)而带来的效率瓶颈.
💻 C
字号:
/* * 进程预分配、线程池函数库,由于用到了全局变量,只能做成静态库。 *  * start_server(struct server_info *ser_info, int (*handler)(int fd));  * * struct server_info *结构体包含端口,最大并发数等一些信息,调用改函数 * 时,该结构体各项应指定好。  * handler是一个函数指针,该函数处理具体应用协议,如smtp、pop3等。 * * 注意:当ser_info->thread_num > 1即工作在线程模式下,handler必须是线程 * 安全的。 */#include <stdio.h>#include <stdlib.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/wait.h>#include <sys/poll.h>#include <netinet/in.h>#include <string.h>#include <signal.h>#include <pthread.h>#include <semaphore.h>#include <errno.h>#ifdef FREEBSD#include <machine/param.h>#endif#include <sys/ipc.h>#include <sys/shm.h>#include "network.h"#include "service.h"#define GINT(x, y)      ((int)((x)/(y)) + ((x)%(y)?1:0))#define GMOD(x, y)      ((int)((x)%(y)) - ((x)%(y)?1:0)) * yint thread_num;int max_child;struct server_info *p_ser_info;struct handler_info *p_han_info;int listen_fd;struct ipc_mem *ptr;sem_t sem_idle;int in, out;int cfd_array[MAX_THREAD]; pthread_mutex_t mutex_cfd = PTHREAD_MUTEX_INITIALIZER;;pthread_cond_t cond_cfd = PTHREAD_COND_INITIALIZER;int num = 0;pthread_mutex_t mutex_num = PTHREAD_MUTEX_INITIALIZER;;pthread_cond_t cond_num = PTHREAD_COND_INITIALIZER;/* * 设置子进程的信号处理函数。 */void child_sigexit_handler(int sig){    exit(0);}void child_sigusr1_handler(int sig){    exit(0);}/* * 子进程信号处理函数 */void child_set_signal(){    struct sigaction act;    sigemptyset(&act.sa_mask);    act.sa_flags = 0;    act.sa_handler = child_sigexit_handler;    sigaction(SIGILL, &act, NULL);    sigaction(SIGABRT, &act, NULL);    sigaction(SIGBUS, &act, NULL);    sigaction(SIGFPE, &act, NULL);    sigaction(SIGSEGV, &act, NULL);    sigaction(SIGPIPE, &act, NULL);    sigaction(SIGTERM, &act, NULL);    act.sa_handler = child_sigusr1_handler;    sigaction(SIGUSR1, &act, NULL);}/* * 进程模式。 */int child(){    int fd;    int cfd;    int request;    struct pollfd pd;    request = 0;    fd = udp_connect("localhost", p_ser_info->port);    while (1) {	pd.fd = POLLIN;	if (poll(&pd, 1, 5000) < 0)	    continue;        sem_wait(&(ptr->sem));        printf("ptr->num = %d ptr->idle = %d\n", ptr->num, ptr->idle);        if (ptr->idle > MAX_IDLE) {            ptr->num--;            ptr->idle--;            sem_post(&(ptr->sem));            break;        }        sem_post(&(ptr->sem));	if ((pd.revents & POLLIN) == 0)	    continue;        cfd = accept(listen_fd, NULL, NULL);        if (cfd < 0)            continue;        sem_wait(&(ptr->sem));        ptr->idle--;        if (ptr->idle < MIN_IDLE)            sem_post(&(ptr->sem_notify));        sem_post(&(ptr->sem));        p_han_info->main_handler(cfd);        close(cfd);        sem_wait(&(ptr->sem));        ptr->idle++;        sem_post(&(ptr->sem));        request++;        if (request >= MAX_REQUEST) {            sem_wait(&(ptr->sem));            ptr->num--;            ptr->idle--;            if (ptr->idle < MIN_IDLE)                sem_post(&(ptr->sem_notify));            sem_post(&(ptr->sem));            break;        }    }    exit(0);    return 0;}/* * 服务线程。 */void *sleep_pth(void *args){    int cfd;    while (1) {        pthread_mutex_lock(&mutex_cfd);        // 队列中已有未处理的连接时,直接处理,无需pthread_cond_wait。        while (cfd_array[out] == 0) {            pthread_cond_wait(&cond_cfd, &mutex_cfd);        }        cfd = cfd_array[out];        cfd_array[out] = 0;        out = (out + 1) % thread_num;        pthread_mutex_unlock(&mutex_cfd);        p_han_info->main_handler(cfd);        close(cfd);        sem_wait(&(ptr->sem));        ptr->idle++;        sem_post(&(ptr->sem));        sem_post(&sem_idle);    }    return NULL;}/* * 监控进程。 */int listen_pth(){    int fd;    int cfd;    int i;    int request;    struct pollfd pd;    request = 0;    fd = udp_connect("localhost", p_ser_info->port);    while (1) {        // 无空闲线程时,阻塞在这里。        sem_wait(&sem_idle);	pd.fd = listen_fd;	pd.events = POLLIN;        // 会被父进程发送的SIGTERM信号中断。	if (poll(&pd, 1, 5000) <= 0) {            sem_post(&sem_idle);            continue;        }        sem_wait(&(ptr->sem));        printf("ptr->num = %d ptr->idle = %d\n", ptr->num, ptr->idle);        if (ptr->idle > MAX_IDLE * thread_num) {            ptr->num --;            ptr->idle -= thread_num;            sem_post(&(ptr->sem));            sem_post(&sem_idle);            break;        }        sem_post(&(ptr->sem));	if ((pd.revents & POLLIN) == 0) {            sem_post(&sem_idle);            continue;        }        cfd = accept(listen_fd, NULL, NULL);        if (cfd < 0) {            sem_post(&sem_idle);            continue;        }        sem_wait(&(ptr->sem));        ptr->idle--;        if (ptr->idle < MIN_IDLE)            sem_post(&(ptr->sem_notify));        sem_post(&(ptr->sem));        // 唤醒一个空闲线程。        pthread_mutex_lock(&mutex_cfd);        cfd_array[in] = cfd;        in = (in + 1) % thread_num;        pthread_mutex_unlock(&mutex_cfd);        // 服务线程不在pthread_cond_wait时,此处唤醒信号被丢失。        pthread_cond_signal(&cond_cfd);        request++;        if (request >= MAX_REQUEST) {            sem_wait(&(ptr->sem));            ptr->num --;            ptr->idle -= thread_num;            if (ptr->idle < MIN_IDLE)                sem_post(&(ptr->sem_notify));            sem_post(&(ptr->sem));            break;        }    }    // 有线程正在服务时,阻塞在这里。    for (i = 0; i < thread_num; i++) {        sem_wait(&sem_idle);    }    return 0;}/* * 创建线程池 */int pth_func(){    int i;    int ret;    pthread_t id;    sem_init(&sem_idle, 0, thread_num);    for (i = 0; i < thread_num; i++) {        ret = pthread_create(&id, NULL, sleep_pth, NULL);        if (0 != ret) {            continue;        }    }    // 主线程监听    listen_pth();    exit(0);    return 0; }/* * 创建子进程,返回创建失败的进程个数。 */int create_child(int num){    int i;    pid_t pid;    for (i = 0; num > 0; i++) {        sem_wait(&(ptr->sem));        if (ptr->num >= max_child) {            sem_post(&(ptr->sem));            break;        }        ptr->num++;        ptr->idle += thread_num;        sem_post(&(ptr->sem));        pid = fork();        if (pid < 0) {            sem_wait(&(ptr->sem));            ptr->num--;            ptr->idle -= thread_num;            sem_post(&(ptr->sem));            return num;        }        if (pid == 0) {            child_set_signal();            if (thread_num > 1) {                pth_func();            } else {                child();            }        } else {            num--;        }    }    return num;}/* * 向子进程发送SIGUSR1号,杀子进程。 */void sigexit_handler(int sig){    printf("pid = %d receive sig = %d", getpid(), sig);    close(listen_fd);    kill(0, SIGUSR1);    if (U_DOMAIN == p_ser_info->type || BOTH == p_ser_info->type)        unlink(p_ser_info->path);    if (p_han_info->exit_handler)        p_han_info->exit_handler();    exit(0);}/* * 回收子进程。 */void sigchld_handler(int sig){    int status;    pid_t pid;    // 多个子进程退出,可能只收到1个SIGCHLD信号。    while ((pid = waitpid(-1, &status, WNOHANG) > 0)) {    }}/* * 设置prefork主进程的信号处理函数。 */void set_signal(){    struct sigaction act;    sigemptyset(&act.sa_mask);    act.sa_flags = 0;    act.sa_handler = sigexit_handler;    sigaction(SIGILL, &act, NULL);    sigaction(SIGABRT, &act, NULL);    sigaction(SIGBUS, &act, NULL);    sigaction(SIGFPE, &act, NULL);    sigaction(SIGSEGV, &act, NULL);    sigaction(SIGTERM, &act, NULL);    sigaction(SIGQUIT, &act, NULL);    sigaction(SIGINT, &act, NULL);    act.sa_handler = sigchld_handler;    sigaction(SIGCHLD, &act, NULL);}/* * 分配共享内存(进程表) */int set_ipc(){    int shmid;    shmid = shmget(IPC_PRIVATE, sizeof(struct ipc_mem),             0600 | IPC_CREAT);    if (shmid < 0) {        return -1;    }    ptr = shmat(shmid, NULL, 0);    if (ptr == NULL) {        return -1;    }    memset(ptr, 0, sizeof(struct ipc_mem));    sem_init(&(ptr->sem), 1, 1);    sem_init(&(ptr->sem_notify), 1, 0);    shmctl(shmid, IPC_RMID, 0); // 这样所有的进程退出后将自动清空    return 0;}/* * 启动预分配、线程池网络服务。 */int start_server(struct server_info *ser_info, struct handler_info *han_info){    int idle;    set_signal();    p_han_info = han_info;    p_ser_info = ser_info;    if (p_han_info->init_handler)        p_han_info->init_handler();    if (I_NET == ser_info->type) {        listen_fd = tcp_listen(ser_info->port, 500, NULL);    } else if (U_DOMAIN == ser_info->type) {        listen_fd = stream_listen(ser_info->path, 500);    } else if (BOTH == ser_info->type) {        printf("unimpliment type\n");        return -1;    }    if (-1 == listen_fd)        return -1;    thread_num = ser_info->thread_num;    if (thread_num > MAX_THREAD)        thread_num = MAX_THREAD;    if (thread_num < 0)        thread_num = 1;    if (thread_num > 1) {        max_child = GINT(ser_info->max_service, thread_num);    } else {        max_child = ser_info->max_service;    }    if (max_child > MAX_PROC)        max_child = MAX_PROC;    set_ipc();    create_child(MIN_IDLE);    while (1) {        sem_wait(&(ptr->sem_notify));        sem_wait(&(ptr->sem));        idle = ptr->idle;        sem_post(&(ptr->sem));        if (idle < MIN_IDLE * thread_num)            create_child(MIN_IDLE);    }    return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -