⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.c

📁 这是基于C语言开发的分布式搜索源代码
💻 C
字号:
#include "server.h"#include "core.h"#include "thread.h"#include "timer.h"#include "buffer.h"#include "queue.h"#include "log.h"#include "inet_base.h"#define SERVER_CHECK_RET(sv, ret)\{\	if(sv == NULL )\        { \                _ERROR_LOG("FATAL:server pointer is null"); \                return ret; \        } \}#define SERVER_CHECK(sv)\{\        if(sv == NULL )\        { \                _ERROR_LOG("FATAL:server pointer is null"); \                return ; \        } \}/* Initialize SERVER struct */SERVER *server_init(){	SERVER *sv = (SERVER *)calloc(1, sizeof(SERVER));	if(sv == NULL)	{		_ERROR_LOG("FATAL:calloc NEW SERVER failed, %s", strerror(errno));		return NULL;	}	sv->init                = sv_init;        sv->run                 = sv_run;        sv->start               = sv_start;        sv->stop                = sv_stop;        sv->event_handler       = sv_event_handler;        sv->addconn             = sv_addconn;        sv->terminate           = sv_terminate;        sv->clean             	= sv_clean;	sv->timer		= timer_init();	sv->sv_event		= ev_init();	return sv;}/* handle event call */void sv_event_handler(int event_fd, short event, void *arg){	struct sockaddr_in sa, rsa;	int fd, newfd, ret = 0;	char buf[DGRAM_SIZE];	socklen_t rsa_len = 0;	SERVER *sv = (SERVER *)arg;		socklen_t sa_len = rsa_len = sizeof( struct sockaddr_in );	SERVER_CHECK(sv);	if(sv->tcpfd == event_fd && sv->sock_t & TCP_T)	{		if((fd = accept(event_fd, (struct sockaddr *)&sa, &sa_len)) == -1)		{			ERROR_LOG(sv->log, "Accept new connetion failed, %s",				 strerror(errno));			return ;		}		DEBUG_LOG(sv->log, "Accept new TCP connection(%ld)", fd);		sv->addconn(sv, fd, sa);		return ;	}		if(sv->udpfd == event_fd && sv->sock_t & UDP_T)	{                if(recvfrom(event_fd, buf, DGRAM_SIZE, 0,                                        (struct sockaddr *)&rsa, &rsa_len) < 0)		{                        ERROR_LOG(sv->log, "Accept new UDP connection failed, %s",				 strerror(errno));                        return ;                }                newfd = socket(sv->domain, UDP_T, 0);                if(newfd < 0)                        ERROR_LOG(sv->log, "Create new socket failed, %s",				 strerror(errno));                SOCK_CONN(newfd, rsa, ret);                if(ret < 0 ) return ;                SOCK_BIND(newfd, sv->lsa, ret);                if(ret < 0 ) return ;                sv->addconn(sv, newfd, rsa);                DEBUG_LOG(sv->log, "Accept new UDP connection(%ld)", newfd);		return ;        }}/* initialize sv */int  sv_init(SERVER *sv){	int i = 0;	int ret = 0;	pthread_t thread_id;	SERVER_CHECK_RET(sv, -1);	sv->eventbase 	= evbase_init();	sv->threads 	= (THREAD **)calloc(sv->max_threads, sizeof(THREAD *));	if(sv->threads == NULL)	{		ERROR_LOG(sv->log,  "Initialize thread pool failed");		return -1;	}	/* Initialize threads pool */	for(i = 0; i < sv->max_threads; i++)	{		sv->threads[i] = thread_init();		if(sv->threads[i])		{			/* base setting */			sv->threads[i]->sv = sv;			sv->threads[i]->index = i;			sv->threads[i]->log = sv->log;				/* create pthread and run */			if( pthread_create(&thread_id, NULL,				&pth_run, (void *)(sv->threads[i])) != 0)			{				sv->threads[i]->clean(&(sv->threads[i]));				ERROR_LOG(sv->log, "Create thread[%d] failed ", i);				continue;			}			else			{				DEBUG_LOG(sv->log, "Created thread[0x%08X] %d of %d",					(int)thread_id, i, sv->max_threads);			}		}		else		{			ERROR_LOG(sv->log, "Initialize thread failed, %s", strerror(errno));			}	}	/* Setting sa */	SA_SET(sv->lsa, sv->domain, sv->ip, sv->port);	/* setting TCP */	if(sv->sock_t & TCP_T)	{		sv->tcpfd = socket(sv->domain, TCP_T, 0);		if( (ret = inet_init(sv->tcpfd, &(sv->lsa), sv->backlog,						//(S_SOCK_BIND | S_SOCK_LISTEN) )) != 0 )						(S_SOCK_BIND | S_SOCK_LISTEN | S_SOCK_NONBLOCK) )) != 0 )		{			ERROR_LOG(sv->log, "Initialize TCP server failed");				return -1;		}		DEBUG_LOG(sv->log, "Initialized tcpfd:%d", sv->tcpfd);		sv->sv_event->set(sv->sv_event, sv->tcpfd, E_READ|E_PERSIST, (void *)sv, sv->event_handler);		//event_set(&sv->sv_event, sv->tcpfd, E_READ | E_PERSIST,		//		sv->event_handler, (void *)sv );	}	/* Setting UDP */	if(sv->sock_t & UDP_T)	{		sv->udpfd = socket(sv->domain, UDP_T, 0);		if((ret =  inet_init(sv->udpfd, &(sv->lsa),			sv->backlog, (S_SOCK_BIND | S_SOCK_NONBLOCK) ) ) != 0 )		{			ERROR_LOG(sv->log, "Initialize UDP server failed");				return -1;		}		DEBUG_LOG(sv->log, "Initialized udpfd:%d", sv->udpfd);		sv->sv_event->set(sv->sv_event, sv->udpfd, E_READ|E_PERSIST, (void *)sv, sv->event_handler);		//event_set(&sv->sv_event, sv->udpfd, E_READ | E_PERSIST,		//		sv->event_handler, (void *)sv );	}	//Setting event	sv->eventbase->add(sv->eventbase, sv->sv_event);	//event_base_set(sv->eventbase, &(sv->sv_event));	//event_add(&sv->sv_event, NULL);	return 0;}/* run sv */void sv_run(SERVER *sv ){	SERVER_CHECK(sv);	uint64_t n = 0;	if(sv->heartbeat_handler && sv->timer) 	{		sv->timer->callback = sv->heartbeat_handler;	}	while(sv->running_status)	{		//event_base_loop(sv->eventbase, EVLOOP_ONCE | EVLOOP_NONBLOCK);		sv->eventbase->loop(sv->eventbase, 0, NULL);		if(sv->timer) sv->timer->check(sv->timer, sv->heartbeat_interval * 1000000);		usleep(sv->sleep_usec);	}}/* add new connection to threads */int  sv_addconn(SERVER *sv, int sockfd, struct sockaddr_in sa){	int index ;	THREAD *pth;	SERVER_CHECK_RET(sv, -1);	//sv->running_status = 1;	if(sv->running_connections >= sv->max_connections)	{		ERROR_LOG(sv->log, "Connection is full");		shutdown(sockfd, SHUT_RDWR);		close(sockfd);		return -1;	}	index = sockfd % sv->max_threads;	pth  = (THREAD *) sv->threads[index];	if(pth == NULL )	{		ERROR_LOG(sv->log, "Thread[%u] is NULL", index);		return -1;	}	if( (pth->addconn(pth, sockfd, sa)) == 0)	{		sv->running_connections++;		DEBUG_LOG(sv->log, "Added new SESSION[%ld] total %ld ",				sockfd, sv->running_connections);		return 0;	}	return -1;}/* start sv */void sv_start(SERVER *sv ){	SERVER_CHECK(sv);	sv->running_status = 1;	sv->run(sv);}/* stop sv */void sv_stop(SERVER *sv){	SERVER_CHECK(sv);	sv->running_status = 0;	DEBUG_LOG(sv->log, "Terminating Server Now");	sv->terminate(sv);	sv->clean(&sv);}/* Terminate sv */void sv_terminate(SERVER * sv){	int i = 0;	THREAD *pth = NULL;	SERVER_CHECK(sv);	sv->running_status = 0;	/* Close server TCP socket */	if(sv->tcpfd > 0 )	{		shutdown(sv->tcpfd, SHUT_RDWR);		close(sv->tcpfd);		sv->udpfd = 0;	}	/* Close server UDP socket */	if(sv->udpfd > 0 )	{		shutdown(sv->udpfd, SHUT_RDWR);			close(sv->udpfd);		sv->udpfd = 0;	}	/* Close threads */	for(i = 0; i < sv->max_threads; i++)	{		if((pth = sv->threads[i]) != NULL)		{			pth->terminate(pth);			if(pthread_join(pth->thread_id, NULL) == 0 )			{				DEBUG_LOG(sv->log, "Terminated thread[%u]", pth->thread_id);			}			sv->running_threads--;		}	}}/* clean sv */void sv_clean(SERVER **sv){	THREAD *pth = NULL;	int i = 0;	if((*sv))	{		/* Clean threads */		if((*sv)->threads)		{			for(i = 0; i < (*sv)->max_threads; i++)			{				if((pth = (*sv)->threads[i] ) != NULL)				{					pth->clean(&pth);					(*sv)->threads[i] = NULL;				}			}			free((*sv)->threads);			(*sv)->threads = NULL;		}		/* Clean event base */		if((*sv)->eventbase) (*sv)->eventbase->clean(&((*sv)->eventbase));		/* Clean Timer */		(*sv)->timer->clean(&((*sv)->timer));		/* Clean event */		(*sv)->sv_event->clean(&((*sv)->sv_event));		/* Clean self */		free((*sv));		(*sv) = NULL;	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -