⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.c

📁 这是基于C语言开发的分布式搜索源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
#include "session.h"#include "sbase.h"#include "buffer.h"#include "queue.h"/* sendqueue settting *///sess->push_message(sess, MESSAGE_OUTPUT);#define SESSION_CHECK_RET(sess, ret) \{ \	if(sess == NULL)\        { \                _ERROR_LOG("ERROR:SESSION is NULL");\                return ret; \        } \        if(sess->transaction_state == CLOSED_STATE) return ret; \}#define SESSION_CHECK(sess) \{ \        if(sess == NULL)\        { \                _ERROR_LOG("ERROR:SESSION is NULL");\                return ; \        } \        if(sess->transaction_state == CLOSED_STATE) return ;\}/* Initialize SESSION struct */SESSION *session_init(){	SESSION *sess = (SESSION *)calloc(1, sizeof(SESSION));	if(sess == NULL)	{			_ERROR_LOG("ERROR:calloc new SESSION failed, %s", strerror(errno));			return NULL;	}	sess->set              		= sess_set;        sess->event_handler      	= sess_event_handler;        sess->read_handler      	= sess_read_handler;        sess->write_handler     	= sess_write_handler;        sess->chunk_reader      	= sess_chunk_reader;        sess->packet_reader     	= sess_packet_reader;        sess->packet_handler    	= sess_packet_handler;        sess->data_handler    		= sess_data_handler;        sess->push_message        	= sess_push_message;        sess->push_cache        	= sess_push_cache;        sess->push_chunk        	= sess_push_chunk;        sess->push_file         	= sess_push_file;        sess->oob_handler    		= sess_oob_handler;        sess->state_handler    		= sess_state_handler;        sess->terminate         	= sess_terminate;        sess->clean	             	= sess_clean;	sess->buffer 			= buffer_init();	sess->cache 			= buffer_init();	sess->oob 			= buffer_init();        sess->chunk 			= chunk_init();        sess->send_queue 		= queue_init();        sess->joblist 			= queue_init();        sess->timer			= timer_init();	sess->s_event			= ev_init();	return sess;}/* initialize session */int sess_set(SESSION *sess, int fd){	socklen_t sa_len;	int keep_alive = 1;//设定KeepAlive	int keep_idle = 1;//开始首次KeepAlive探测前的TCP空闭时间	int keep_interval = 1;//两次KeepAlive探测间的时间间隔	int keep_count = 3;//判定断开前的KeepAlive探测次数	SESSION_CHECK_RET(sess, -1);	sess->packet_t 		= sess->pth->sv->packet_t;	sess->packet_len	= sess->pth->sv->packet_len;	sess->delimiter		= sess->pth->sv->delimiter;	sess->delimiter_len	= sess->pth->sv->delimiter_len;	sess->fd                = fd;	sess->transaction_id 	= 0;	sess->transaction_state = 0;	sess->log		= sess->pth->log;	sess->buf_size 		= (sess->pth->sv->buf_size) ? sess->pth->sv->buf_size : BUF_SIZE;	/* Get peer name */		//getpeername(sess->fd, (struct sockaddr *)&(sess->sa), &sa_len);	//strcpy(sess->ip, inet_ntoa(sess->sa.sin_addr));	//sess->port = ntohs(sess->sa.sin_port);	/* Set Non-block */	fcntl(sess->fd, F_SETFL, O_NONBLOCK);	/* set keepalive */	setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,			(void*)&keep_alive, sizeof(keep_alive));	setsockopt(fd, SOL_TCP, TCP_KEEPIDLE,			(void *)&keep_idle,sizeof(keep_idle));	setsockopt(fd,SOL_TCP,TCP_KEEPINTVL,			(void *)&keep_interval, sizeof(keep_interval));	setsockopt(fd,SOL_TCP,TCP_KEEPCNT,			(void *)&keep_count,sizeof(keep_count));	/* Initialize event */	sess->s_event->set(sess->s_event, sess->fd, E_READ | E_PERSIST, (void *)sess, sess->event_handler);	sess->pth->eventbase->add(sess->pth->eventbase, sess->s_event);	return 0;}/* SESSION event handler */void sess_event_handler(int event_fd, short event, void *arg){	SESSION *sess = (SESSION *)arg;	short flags = event;	if(sess)	{		if(event_fd != sess->fd)                {                        ERROR_LOG(sess->log, "event file descriptor [%d] do not match session fd[%d]",                                event_fd, sess->fd);                         return ;                }                else                {                        DEBUG_LOG(sess->log, "EV_HANDLER:%d", event);                }                if( flags & E_READ)                {                                DEBUG_LOG(sess->log, "E_READ:%d", E_READ);                                if(sess->read_handler(sess) != 0) return ;                                flags ^= E_READ;                }                if( flags & E_WRITE)                {                                DEBUG_LOG(sess->log, "E_WRITE:%d", E_WRITE);                                if(sess->write_handler(sess) != 0 ) return ;                                flags ^= E_WRITE;                }                if(flags != 0 )                {                                ERROR_LOG(sess->log, "UNKOWN EV:%d", flags);                                //sess->pth->terminate_session(sess->pth, sess);				sess->push_message(sess, MESSAGE_QUIT);                }	}	return ;}/* read data from fd*/int sess_read_handler(SESSION *sess){	SESSION_CHECK_RET(sess, -1);	int len, total = 0;	void *tmp = NULL;	int fd = -1;	if(sess->transaction_state == CLOSED_STATE) return -1;	tmp = (void *)calloc(1, sess->buf_size);	/* Reading OOB data */	if(len = recv(sess->fd, tmp, sess->buf_size, MSG_OOB) > 0 )	{		DEBUG_LOG(sess->log, "Read %d byte(s) OOB data", len);		sess->oob_recv_total += len;				/* Push data to buffer */        	sess->oob->push(sess->oob, tmp, len);		/* Update timer */                sess->timer->sample(sess->timer);		sess->oob_handler(sess);		goto end;	}	/* Reading normal data */	if((len = read(sess->fd, tmp, sess->buf_size)) <= 0)	{			ERROR_LOG(sess->log, "Reading from %s:%d failed, %s",					sess->ip, sess->port, strerror(errno));			goto err_end;	}	/*	else	{		if((fd = open("/tmp/put", O_CREAT | O_RDWR |O_APPEND, 0644)) > 0 )		{			write(fd, tmp, len);				close(fd);		}				}	*/	sess->recv_total += len;	/* Push data to buffer */	sess->buffer->push(sess->buffer, tmp, len);	/* Update event */	DEBUG_LOG(sess->log, "Read %ld byte(s) From %s:%d via fd(%d) BUFFER total:%u", 		len , sess->ip, sess->port, sess->fd, sess->buffer->size);	/* Handling Incomming buffer WITH CHUNK */	if(sess->transaction_state == READ_CHUNK_STATE )	{		sess->chunk_reader(sess);		goto end;	}	/* Reading packet */	sess->packet_reader(sess);		end:	{		if(tmp) free(tmp);				}	return 0;	err_end:	{		if(tmp) free(tmp);		//sess->pth->terminate_session(sess->pth, sess);		sess->push_message(sess, MESSAGE_QUIT);	}	return -1;}/* wirite data to fd */int sess_write_handler(SESSION *sess){	int  	sent  = 0;	uint64_t len = 0llu;	CHUNK *cp = NULL;	SESSION_CHECK_RET(sess, -1);	/* send CHUNK */	cp = (CHUNK *)(sess->send_queue->head(sess->send_queue));	if(cp == NULL ) goto end;	len = cp->len;	//CHUNK_VIEW(cp);	if((sent = cp->send(cp, sess->fd, sess->buf_size) ) > 0  )	{		sess->send_total += sent;				DEBUG_LOG(sess->log, "Sent %u of %llu byte(s) to %s:%d via fd[%d]",			sent, len, sess->ip, sess->port, sess->fd)		if(cp->len <= 0llu )		{			cp = (CHUNK *)sess->send_queue->pop(sess->send_queue);			DEBUG_LOG(sess->log, "Clean CHUNK[%08x]", cp);			if(cp)cp->clean(&cp);		}		/* Update timer */                sess->timer->sample(sess->timer);	}	else	{		ERROR_LOG(sess->log, "Sending chunk to %s:%d failed, %s",			sess->ip, sess->port, strerror(errno));		/* ERROR , QUIT */		//sess->pth->terminate_session(sess->pth, sess);		sess->push_message(sess, MESSAGE_QUIT);		return -1;	}	end:	{		if(sess->send_queue->total == 0)                	sess->s_event->del(sess->s_event, E_WRITE);	}	return 0;}/* packet handler for hook */void sess_packet_handler(SESSION *sess){	BUFFER *packet = NULL ;	int len = 0;	SESSION_CHECK(sess);	/* Clean Last packet */	if(sess->packet)  sess->packet->clean(&(sess->packet));		DEBUG_LOG(sess->log, "sess->packet:%08x", sess->packet);	/* Get joblist header pointer */	if(sess->joblist) packet = (BUFFER *) sess->joblist->pop(sess->joblist);	if(packet == NULL) return ;	DEBUG_LOG(sess->log, "Poped packet[%08x] from joblist leave %d ",		 packet, sess->joblist->total);	/* User Customize handling */		if(sess->pth->sv->packet_handler)	{		DEBUG_LOG(sess->log, "Handling packet using customize function[0x%08X]",				sess->pth->sv->packet_handler);		sess->pth->sv->packet_handler((const HANDLER *)sess, (const BUFFER *)packet);		DEBUG_LOG(sess->log, "Handled packet:%08x size:%d", packet, packet->size);		switch(sess->transaction_state)		{			case READ_CHUNK_STATE:				sess->packet = packet;				return ;			break;			case DATA_HANDLING_STATE:				sess->packet = packet;				return ;			break;			default:				return packet->clean(&packet);			break;		}	}	if(packet) packet->clean(&packet); 	return ;}/* data handler */void sess_data_handler(SESSION *sess){	SESSION_CHECK(sess);	switch(sess->transaction_state)	{		/* Reading for CHUNK */		case READ_CHUNK_STATE:			sess->chunk_reader(sess);			break;		/* Handling packet and CHUNK */		case DATA_HANDLING_STATE:			if(sess->pth->sv->data_handler)			{				DEBUG_LOG(sess->log,					"Handling packet data using customize function[0x%08X]",                                	sess->pth->sv->data_handler);				sess->pth->sv->data_handler((const HANDLER *)sess,					(const BUFFER *)(sess->packet), (const CHUNK *)(sess->chunk) ,					(const BUFFER *)(sess->cache) );					DEBUG_LOG(sess->log, "Handled packet:%08x size:%d",					 sess->packet, sess->packet->size);				if(sess->packet) sess->packet->clean(&(sess->packet));	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -