⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 thread.c

📁 这是基于C语言开发的分布式搜索源代码
💻 C
字号:
#include "thread.h"#include "session.h"#include "buffer.h"#include "queue.h"#include "timer.h"#define THREAD_CHECK_RET(pth, ret)\{ \	if(pth == NULL) \	{ \		_ERROR_LOG("FATAL:Thread is NULL"); \		return ret; \	} \}#define THREAD_CHECK(pth)\{ \        if(pth == NULL)\        { \                _ERROR_LOG("FATAL:Thread is NULL");\                return;\        }\}/* initialize THREAD struct */THREAD *thread_init(){	THREAD *pth = (THREAD *)calloc(1, sizeof(THREAD));	if(pth == NULL)        {                       _ERROR_LOG("ERROR:Calloc new Thread failed, %s", strerror(errno));                  return NULL;        }		pth->event_handler	= pth_event_handler;        pth->run                = pth_run;        pth->addconn            = pth_addconn;        pth->add_session        = pth_add_session;        pth->terminate_session	= pth_terminate_session;        pth->state_conns        = pth_state_conns;        pth->terminate       	= pth_terminate;        pth->clean       	= pth_clean;	pth->running_status     = 1;        pth->message_queue      = queue_init();        //pth->eventbase          = (struct event_base *)ev_init();        pth->eventbase          = evbase_init();        pth->timer              = timer_init();        pthread_mutex_init(&pth->mutex, NULL);	return pth;}/* THREAD event handler */void pth_event_handler(int event_fd, short event, void *arg){	SESSION *sess = NULL;	short  flags = event;	THREAD *pth = (THREAD *)arg;	if(pth && pth->sessions && (sess = pth->sessions[event_fd]) )	{				if(event_fd != sess->fd)		{			ERROR_LOG(pth->log, "event file descriptor [%d] do not match session fd[%d]",				event_fd, sess->fd);			 return ;		}		else		{			DEBUG_LOG(sess->log, "EV_HANDLER:%d", event);		}		if( flags & E_READ)		{				DEBUG_LOG(pth->log, "E_READ:%d", E_READ);				if(sess->read_handler(sess) != 0) return ;				flags ^= E_READ;		}		if( flags & E_WRITE)		{				DEBUG_LOG(pth->log, "E_WRITE:%d", E_WRITE);				if(sess->write_handler(sess) != 0 ) return ;				flags ^= E_WRITE;		}		if(flags != 0 )		{				ERROR_LOG(pth->log, "UNKOWN EV:%d", flags);				pth->terminate_session(pth, sess);			}	}	return ;}/* Running thread */void	*pth_run(void *arg){	MESSAGE *msg;	THREAD *pth   = (THREAD *)arg;	SESSION *sess = NULL;	uint64_t n = 0;	THREAD_CHECK_RET(pth, NULL);	/* Running */	pth->thread_id = pthread_self();	while(pth->running_status)	{		/* Check connection state */		if(pth->timer)		{			if((time(NULL) -  pth->timer->last_sec) >=  pth->sv->conn_timeout )			{				DEBUG_LOG(pth->log, "Thread[%08x] Heartbeat %d", pth->thread_id, ++n);				pth->state_conns(pth);				pth->timer->sample(pth->timer);			}		}		/* Event Loop */		//event_base_loop(pth->eventbase, EVLOOP_ONCE | EVLOOP_NONBLOCK);		pth->eventbase->loop(pth->eventbase, 0, NULL);		usleep(pth->sv->sleep_usec);		/* Message Queue */		msg = (MESSAGE *)(pth->message_queue->pop(pth->message_queue));		if(msg)		{			DEBUG_LOG(pth->log, "Handling message[%08x] ID[%d]", msg, msg->msg_id);			sess = (SESSION *)msg->handler;			if(sess && msg->msg_id != MESSAGE_NEW_SESSION 				&& msg->handler != pth->sessions[msg->fd]) goto next;			switch(msg->msg_id)			{				/* NEW connection */				case MESSAGE_NEW_SESSION :					if(msg->handler)					{						pth->add_session(pth, msg->fd,							(*(struct sockaddr_in *)msg->handler));							free(msg->handler);					}					break;					/* Close connection */				case MESSAGE_QUIT :					if(pth->sessions[msg->fd])						pth->terminate_session(pth, sess);					break;				case MESSAGE_INPUT :					break;				case MESSAGE_OUTPUT :					if(sess)sess->write_handler(sess);					break;				case MESSAGE_PACKET :					if(sess)sess->packet_handler(sess);					break;				case MESSAGE_DATA :					if(sess)sess->data_handler(sess);					break;				default:					break;			}			next:			msg->clean(&msg);		}		usleep(pth->sv->sleep_usec);	}	pthread_exit(NULL);	return NULL;}/* add new connection to thread */int     pth_addconn(THREAD *pth, int fd, struct sockaddr_in sa){	MESSAGE *msg = NULL;	THREAD_CHECK_RET(pth, -1);	msg = message_init();	if(msg)	{		DEBUG_LOG(pth->log, "Initialize message[MESSAGE_NEW_SESSION]");		msg->msg_id      = MESSAGE_NEW_SESSION;		msg->fd          = fd;		msg->handler	 = calloc(1, sizeof(struct sockaddr_in));		memcpy(msg->handler, &sa, sizeof(struct sockaddr_in));		pth->message_queue->push(pth->message_queue, (void *)msg);		return 0;	}	return -1;}/* check connection stats */void pth_state_conns(THREAD *pth){	SESSION *sess = NULL;	int i = 0;		THREAD_CHECK(pth);        /* stop and free sessions */	if(pth->sessions)	{		DEBUG_LOG(pth->log, "Checking connections state");			for(i = 0; i < pth->sv->max_connections; i++)		{			if((sess = pth->sessions[i]) != NULL)				sess->state_handler(sess);		}	}}/* add new session to thread */int     pth_add_session(THREAD *pth, int fd, struct sockaddr_in sa){	SESSION *sess = NULL;	THREAD_CHECK_RET(pth, -1);	/* Check sessions and Initialize */	if(pth->sessions == NULL)	{		pth->sessions  = (SESSION **)calloc(pth->sv->max_connections, sizeof(SESSION *));	}	/* Initialize new session */	if(pth->sessions)	{		if(pth->sessions[fd])		{			DEBUG_LOG(pth->log, "SESSION[%d] is exists", fd);			pth->terminate_session(pth, pth->sessions[fd]);		}		DEBUG_LOG(pth->log, "adding NEW SESSION[%d]", fd);		if( (sess = session_init()) != NULL )		{			pth->sessions[fd] = sess;			memcpy(&(sess->sa), &sa, sizeof(struct sockaddr_in));			strcpy(sess->ip, inet_ntoa(sa.sin_addr));			sess->port = ntohs(sa.sin_port);			/* base setting */			sess->pth = pth;			if(sess->set(sess, fd) != 0 )			{				FATAL_LOG(pth->log, "Initialize new session[%d] failed");					pth->terminate_session(pth, sess);				return -1;			}		}		else		{			ERROR_LOG(pth->log, "Initialize new session failed, %s", strerror(errno));			return -1;		}	}		else	{		ERROR_LOG(pth->log, "Initialize sessions failed, %s", strerror(errno));	}	DEBUG_LOG(pth->log, "added NEW SESSION[%d]", fd);	return 0;}/* Terminate session */void pth_terminate_session(THREAD *pth, SESSION *sess){	THREAD_CHECK(pth);		if(sess && sess->fd < pth->sv->max_connections )	{		pth->sessions[sess->fd]	= NULL;		sess->terminate(sess);		sess->clean(&sess);	}}/* Terminate  threads */void  pth_terminate(THREAD *pth){	int i = 0;	THREAD_CHECK(pth);	pth->running_status = 0;	/* Terminate sessions */        for(i = 0; i < pth->sv->max_connections; i++)        {                if(pth->sessions[i] != NULL)			pth->sessions[i]->terminate(pth->sessions[i]);        }}/* clean THREAD child STRUCT */void pth_clean(THREAD **pth){	MESSAGE *msg;	int i = 0;	THREAD_CHECK((*pth));	/* Clean sessions */	if((*pth)->sessions)	{		for(i = 0; i < (*pth)->sv->max_connections; i++)		{			if((*pth)->sessions[i] != NULL)				(*pth)->sessions[i]->clean(&((*pth)->sessions[i]));		}		free((*pth)->sessions);	}	/* Clean message_queue */	if((*pth)->message_queue)	{		while((*pth)->message_queue->total > 0 )			{			msg = (MESSAGE *)(*pth)->message_queue->pop((*pth)->message_queue);			if(msg) msg->clean(&msg);		}		(*pth)->message_queue->clean(&((*pth)->message_queue));	}	/* Clean event base */	if((*pth)->eventbase) (*pth)->eventbase->clean(&((*pth)->eventbase));	/* Clean Timer */	if((*pth)->timer) (*pth)->timer->clean(&((*pth)->timer));	return ;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -