📄 server.c
字号:
#include "server.h"#include "core.h"#include "thread.h"#include "timer.h"#include "buffer.h"#include "queue.h"#include "log.h"#include "inet_base.h"#define SERVER_CHECK_RET(sv, ret)\{\ if(sv == NULL )\ { \ _ERROR_LOG("FATAL:server pointer is null"); \ return ret; \ } \}#define SERVER_CHECK(sv)\{\ if(sv == NULL )\ { \ _ERROR_LOG("FATAL:server pointer is null"); \ return ; \ } \}/* Initialize SERVER struct */SERVER *server_init(){ SERVER *sv = (SERVER *)calloc(1, sizeof(SERVER)); if(sv == NULL) { _ERROR_LOG("FATAL:calloc NEW SERVER failed, %s", strerror(errno)); return NULL; } sv->init = sv_init; sv->run = sv_run; sv->start = sv_start; sv->stop = sv_stop; sv->event_handler = sv_event_handler; sv->addconn = sv_addconn; sv->terminate = sv_terminate; sv->clean = sv_clean; sv->timer = timer_init(); sv->sv_event = ev_init(); return sv;}/* handle event call */void sv_event_handler(int event_fd, short event, void *arg){ struct sockaddr_in sa, rsa; int fd, newfd, ret = 0; char buf[DGRAM_SIZE]; socklen_t rsa_len = 0; SERVER *sv = (SERVER *)arg; socklen_t sa_len = rsa_len = sizeof( struct sockaddr_in ); SERVER_CHECK(sv); if(sv->tcpfd == event_fd && sv->sock_t & TCP_T) { if((fd = accept(event_fd, (struct sockaddr *)&sa, &sa_len)) == -1) { ERROR_LOG(sv->log, "Accept new connetion failed, %s", strerror(errno)); return ; } DEBUG_LOG(sv->log, "Accept new TCP connection(%ld)", fd); sv->addconn(sv, fd, sa); return ; } if(sv->udpfd == event_fd && sv->sock_t & UDP_T) { if(recvfrom(event_fd, buf, DGRAM_SIZE, 0, (struct sockaddr *)&rsa, &rsa_len) < 0) { ERROR_LOG(sv->log, "Accept new UDP connection failed, %s", strerror(errno)); return ; } newfd = socket(sv->domain, UDP_T, 0); if(newfd < 0) ERROR_LOG(sv->log, "Create new socket failed, %s", strerror(errno)); SOCK_CONN(newfd, rsa, ret); if(ret < 0 ) return ; SOCK_BIND(newfd, sv->lsa, ret); if(ret < 0 ) return ; sv->addconn(sv, newfd, rsa); DEBUG_LOG(sv->log, "Accept new UDP connection(%ld)", newfd); return ; }}/* initialize sv */int sv_init(SERVER *sv){ int i = 0; int ret = 0; pthread_t thread_id; SERVER_CHECK_RET(sv, -1); sv->eventbase = evbase_init(); sv->threads = (THREAD **)calloc(sv->max_threads, sizeof(THREAD *)); if(sv->threads == NULL) { ERROR_LOG(sv->log, "Initialize thread pool failed"); return -1; } /* Initialize threads pool */ for(i = 0; i < sv->max_threads; i++) { sv->threads[i] = thread_init(); if(sv->threads[i]) { /* base setting */ sv->threads[i]->sv = sv; sv->threads[i]->index = i; sv->threads[i]->log = sv->log; /* create pthread and run */ if( pthread_create(&thread_id, NULL, &pth_run, (void *)(sv->threads[i])) != 0) { sv->threads[i]->clean(&(sv->threads[i])); ERROR_LOG(sv->log, "Create thread[%d] failed ", i); continue; } else { DEBUG_LOG(sv->log, "Created thread[0x%08X] %d of %d", (int)thread_id, i, sv->max_threads); } } else { ERROR_LOG(sv->log, "Initialize thread failed, %s", strerror(errno)); } } /* Setting sa */ SA_SET(sv->lsa, sv->domain, sv->ip, sv->port); /* setting TCP */ if(sv->sock_t & TCP_T) { sv->tcpfd = socket(sv->domain, TCP_T, 0); if( (ret = inet_init(sv->tcpfd, &(sv->lsa), sv->backlog, //(S_SOCK_BIND | S_SOCK_LISTEN) )) != 0 ) (S_SOCK_BIND | S_SOCK_LISTEN | S_SOCK_NONBLOCK) )) != 0 ) { ERROR_LOG(sv->log, "Initialize TCP server failed"); return -1; } DEBUG_LOG(sv->log, "Initialized tcpfd:%d", sv->tcpfd); sv->sv_event->set(sv->sv_event, sv->tcpfd, E_READ|E_PERSIST, (void *)sv, sv->event_handler); //event_set(&sv->sv_event, sv->tcpfd, E_READ | E_PERSIST, // sv->event_handler, (void *)sv ); } /* Setting UDP */ if(sv->sock_t & UDP_T) { sv->udpfd = socket(sv->domain, UDP_T, 0); if((ret = inet_init(sv->udpfd, &(sv->lsa), sv->backlog, (S_SOCK_BIND | S_SOCK_NONBLOCK) ) ) != 0 ) { ERROR_LOG(sv->log, "Initialize UDP server failed"); return -1; } DEBUG_LOG(sv->log, "Initialized udpfd:%d", sv->udpfd); sv->sv_event->set(sv->sv_event, sv->udpfd, E_READ|E_PERSIST, (void *)sv, sv->event_handler); //event_set(&sv->sv_event, sv->udpfd, E_READ | E_PERSIST, // sv->event_handler, (void *)sv ); } //Setting event sv->eventbase->add(sv->eventbase, sv->sv_event); //event_base_set(sv->eventbase, &(sv->sv_event)); //event_add(&sv->sv_event, NULL); return 0;}/* run sv */void sv_run(SERVER *sv ){ SERVER_CHECK(sv); uint64_t n = 0; if(sv->heartbeat_handler && sv->timer) { sv->timer->callback = sv->heartbeat_handler; } while(sv->running_status) { //event_base_loop(sv->eventbase, EVLOOP_ONCE | EVLOOP_NONBLOCK); sv->eventbase->loop(sv->eventbase, 0, NULL); if(sv->timer) sv->timer->check(sv->timer, sv->heartbeat_interval * 1000000); usleep(sv->sleep_usec); }}/* add new connection to threads */int sv_addconn(SERVER *sv, int sockfd, struct sockaddr_in sa){ int index ; THREAD *pth; SERVER_CHECK_RET(sv, -1); //sv->running_status = 1; if(sv->running_connections >= sv->max_connections) { ERROR_LOG(sv->log, "Connection is full"); shutdown(sockfd, SHUT_RDWR); close(sockfd); return -1; } index = sockfd % sv->max_threads; pth = (THREAD *) sv->threads[index]; if(pth == NULL ) { ERROR_LOG(sv->log, "Thread[%u] is NULL", index); return -1; } if( (pth->addconn(pth, sockfd, sa)) == 0) { sv->running_connections++; DEBUG_LOG(sv->log, "Added new SESSION[%ld] total %ld ", sockfd, sv->running_connections); return 0; } return -1;}/* start sv */void sv_start(SERVER *sv ){ SERVER_CHECK(sv); sv->running_status = 1; sv->run(sv);}/* stop sv */void sv_stop(SERVER *sv){ SERVER_CHECK(sv); sv->running_status = 0; DEBUG_LOG(sv->log, "Terminating Server Now"); sv->terminate(sv); sv->clean(&sv);}/* Terminate sv */void sv_terminate(SERVER * sv){ int i = 0; THREAD *pth = NULL; SERVER_CHECK(sv); sv->running_status = 0; /* Close server TCP socket */ if(sv->tcpfd > 0 ) { shutdown(sv->tcpfd, SHUT_RDWR); close(sv->tcpfd); sv->udpfd = 0; } /* Close server UDP socket */ if(sv->udpfd > 0 ) { shutdown(sv->udpfd, SHUT_RDWR); close(sv->udpfd); sv->udpfd = 0; } /* Close threads */ for(i = 0; i < sv->max_threads; i++) { if((pth = sv->threads[i]) != NULL) { pth->terminate(pth); if(pthread_join(pth->thread_id, NULL) == 0 ) { DEBUG_LOG(sv->log, "Terminated thread[%u]", pth->thread_id); } sv->running_threads--; } }}/* clean sv */void sv_clean(SERVER **sv){ THREAD *pth = NULL; int i = 0; if((*sv)) { /* Clean threads */ if((*sv)->threads) { for(i = 0; i < (*sv)->max_threads; i++) { if((pth = (*sv)->threads[i] ) != NULL) { pth->clean(&pth); (*sv)->threads[i] = NULL; } } free((*sv)->threads); (*sv)->threads = NULL; } /* Clean event base */ if((*sv)->eventbase) (*sv)->eventbase->clean(&((*sv)->eventbase)); /* Clean Timer */ (*sv)->timer->clean(&((*sv)->timer)); /* Clean event */ (*sv)->sv_event->clean(&((*sv)->sv_event)); /* Clean self */ free((*sv)); (*sv) = NULL; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -