⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.c

📁 这是基于C语言开发的分布式搜索源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
				if(sess->cache)sess->cache->reset(sess->cache);				if(sess->chunk)sess->chunk->reset(sess->chunk);			}			break;		default:			break;	}	return ;}/* packet read :: read packet from buffer and put to joblist */void sess_packet_reader(SESSION *sess){	int i = 0;	char *p = NULL, *end = NULL;	BUFFER *packet = NULL;	int len = 0;	SESSION_CHECK(sess);	/* Check packet type */	if((sess->packet_t & PACKET_T) == 0 )	{		ERROR_LOG(sess->log, "Unkown packet type[0x%08x]", sess->packet_t);		return ;	}	/* support customize APIS for reading packet from bufffer */	if(sess->packet_t == PACKET_T_NULL && sess->pth->sv->packet_reader)	{		if( (len = sess->pth->sv->packet_reader((const HANDLER *)sess,			 (const BUFFER *)(sess->buffer)) ) > 0 )		{			DEBUG_LOG(sess->log, "Read packet for PACKET_T_NULL length:%d", len);			goto end;		}		else		{			ERROR_LOG(sess->log, "No packet is read for PACKET_T_NULL");		}		return ;	}		/* handling certain length packet */	if(sess->packet_t == PACKET_T_B )	{		if(sess->buffer->size  > sess->packet_len)		{			len = sess->packet_len;			DEBUG_LOG(sess->log, "Read packet for PACKET_T_B length:%d", len);			goto end;		}	}	/* split packet with ceintain delimiter */	if(sess->packet_t == PACKET_T_C)	{		p   = (char *)sess->buffer->data;		end = (char *)sess->buffer->end;		i = 0;		while(p < end)		{			if(*p++ == ((char *)sess->delimiter)[i]) i++;			else i= 0;			if(i == sess->delimiter_len) break;		}		if((len = (p - (char *)(sess->buffer->data)) ) > 0  )		{			goto end;		}	}	return ;	end:	{		if(len > 0 && (packet = buffer_init()))		{			packet->push(packet, sess->buffer->data, len);			if(sess->buffer) sess->buffer->del(sess->buffer, (size_t)len);			if(sess->joblist) sess->joblist->push(sess->joblist, packet);			sess->push_message(sess, MESSAGE_PACKET);		}	}	return ;}/* push MESSAGE to pthread joblist queue */void sess_push_message(SESSION *sess, int msg_id){        MESSAGE *msg = NULL ;	SESSION_CHECK(sess);		if(msg_id == MESSAGE_QUIT) sess->transaction_state = CLOSED_STATE;	if((msg = message_init()) == NULL)	{		ERROR_LOG(sess->log, "Initialize MESSAGE failed, %s", strerror(errno));		return ;	}	else	{		DEBUG_LOG(sess->log, "Initialize NEW MESSAGE[%d] for SESSION[%d]", msg_id, sess->fd);	}        msg->msg_id 	= msg_id;        msg->fd 	= sess->fd;        msg->handler 	= sess;	sess->pth->message_queue->push(sess->pth->message_queue, (void *)msg);	return ;}/* read chunk from BUFFER */int sess_chunk_reader(SESSION *sess){	int 	n = 0;	CHUNK *cp = NULL;		SESSION_CHECK_RET(sess, -1);		if(sess->buffer->size <= 0 ) return -1;	if((cp = sess->chunk) == NULL)	{		ERROR_LOG(sess->log, "CHUNK is NULL");		return -1;	}	if(cp->len <= 0llu ) goto chunk_finish;	/* Fill CHUNK with BUFFER */	if((n = cp->fill(cp , sess->buffer->data, sess->buffer->size)) < 0 )	{		ERROR_LOG(sess->log, "Filling to CHUNK failed");		return -1;	}	else	{		DEBUG_LOG(sess->log, "Filled %d byte(s) to CHUNK ", n);		sess->buffer->del(sess->buffer, n);	}	//CHUNK_VIEW(cp);	if(cp->len > 0llu )	{		sess->push_message(sess, MESSAGE_DATA);	 	return 0;	}chunk_finish :	{		sess->transaction_state = DATA_HANDLING_STATE;		sess->push_message(sess, MESSAGE_DATA);	}	return 0;	}/* add cache to sess->cache  */int sess_push_cache(SESSION *sess, void *data, size_t size){	SESSION_CHECK_RET(sess, -1);	sess->cache->reset(sess->cache);	sess->cache->push(sess->cache, data, size);	return 0;}/* add MEM_CHUNK to send queue */int sess_push_chunk(SESSION *sess, void *data, size_t len ){	CHUNK *cp = NULL;	SESSION_CHECK_RET(sess, -1);	cp = (CHUNK *)(sess->send_queue->tail(sess->send_queue));	if(cp != NULL && cp->type == MEM_CHUNK)	{		cp->append(cp, data, len);	}	else	{		cp = chunk_init();		cp->set(cp, sess->transaction_id, MEM_CHUNK, NULL, 0llu, 0llu);		cp->append(cp, data, len);		sess->send_queue->push(sess->send_queue, (void *)cp);		//CHUNK_VIEW(cp);		//QUEUE_VIEW(sess->send_queue);	}	sess->s_event->add(sess->s_event, E_WRITE);	//QUEUE_VIEW(sess->send_queue);	return 0;}/* add FILE_CHUNK to send queue */int sess_push_file(SESSION *sess, char *filename, uint64_t offset, uint64_t len){	CHUNK *cp = NULL;	SESSION_CHECK_RET(sess, -1);	cp = chunk_init();        cp->set(cp, sess->transaction_id, FILE_CHUNK, filename, offset, len);        sess->send_queue->push(sess->send_queue, (void *)cp);	sess->s_event->add(sess->s_event, E_WRITE);	return 0;}/* set SESSION transaction state */int sess_set_transaction_state(SESSION *sess, uint32_t state){	if(sess && (state & TRANSACTION_STATES))	{		sess->transaction_state = state;			return 0;	}	return -1;}/* set SESSION transaction state */int sess_set_transaction_id(SESSION *sess, uint32_t transaction_id){        if(sess && transaction_id > 0 )        {                sess->transaction_id = transaction_id;                return 0;        }        return -1;}/* read out of band data and handling it */int sess_oob_handler(SESSION *sess){	int buf_size = 1024;	char buf[buf_size];	if(sess && sess->pth->sv->oob_handler)	{		sess->pth->sv->oob_handler((const HANDLER *)sess, (const BUFFER *)(sess->oob));		return 0;	}	return -1;}/* check connection state send oob data ensure connection is connected */int sess_state_handler(SESSION *sess){	if(sess)	{		/*		if(send(sess->fd, (void *)"0", 1, MSG_OOB) < 0 )		{			ERROR_LOG(sess->log, "Sending OOB data failed, %s", strerror(errno));				goto terminate_session;		}		*/		if( (time(NULL) - sess->timer->last_sec) >= sess->pth->sv->conn_timeout )		{			ERROR_LOG(sess->log, "Connection TIMEOUT %d seconds",				sess->pth->sv->conn_timeout);				goto terminate_session;		}				return 0;		terminate_session:		{			sess->push_message(sess, MESSAGE_QUIT);			return -1;		}	}}/* terminate session */int sess_terminate(SESSION *sess){		if(sess)	{		sess->transaction_state = CLOSED_STATE;		DEBUG_LOG(sess->log, "Terminating connection[%d] %s:%d", sess->fd, sess->ip, sess->port);		sess->s_event->destroy(sess->s_event);		DEBUG_LOG(sess->log, "Closing EV");		shutdown(sess->fd, SHUT_RDWR);		close(sess->fd);		sess->pth->sv->running_connections--;	}        return 0;}/* Clean session */void sess_clean(SESSION **sess){	CHUNK *chunk = NULL;	BUFFER *buf = NULL;	if((*sess))	{ 		DEBUG_LOG((*sess)->log, "Cleaning connection[%d] %s:%d", (*sess)->fd,			(*sess)->ip, (*sess)->port);		/* Clean joblist QUEUE */		if((*sess)->joblist)		{				while((*sess)->joblist->total > 0 )                        {                               buf = (BUFFER *)(*sess)->joblist->pop((*sess)->joblist);                               if(buf)buf->clean(&buf);                        }				(*sess)->joblist->clean(&((*sess)->joblist));		}		/* Clean send QUEUE */		if((*sess)->send_queue)		{			while((*sess)->send_queue->total > 0 )			{				chunk = (CHUNK *)(*sess)->send_queue->pop((*sess)->send_queue);				if(chunk)chunk->clean(&chunk);			}			(*sess)->send_queue->clean(&((*sess)->send_queue));		}		/* Clean CHUNK  */		if((*sess)->chunk) (*sess)->chunk->clean(&((*sess)->chunk));		/* Clean BUFFER	 */		if((*sess)->buffer) (*sess)->buffer->clean(&((*sess)->buffer));		/* Clean packet */		if((*sess)->packet) (*sess)->packet->clean(&((*sess)->packet));		/* Clean cache */		if((*sess)->cache) (*sess)->cache->clean(&((*sess)->cache));		/* Clean oob */		if((*sess)->oob) (*sess)->oob->clean(&((*sess)->oob));		/* Clean TIMER */		if((*sess)->timer) (*sess)->timer->clean(&((*sess)->timer));		/* Clean event */		if((*sess)->s_event) (*sess)->s_event->clean(&((*sess)->s_event));		free((*sess));		(*sess) = NULL;	}	return ;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -