📄 session.c
字号:
if(sess->cache)sess->cache->reset(sess->cache); if(sess->chunk)sess->chunk->reset(sess->chunk); } break; default: break; } return ;}/* packet read :: read packet from buffer and put to joblist */void sess_packet_reader(SESSION *sess){ int i = 0; char *p = NULL, *end = NULL; BUFFER *packet = NULL; int len = 0; SESSION_CHECK(sess); /* Check packet type */ if((sess->packet_t & PACKET_T) == 0 ) { ERROR_LOG(sess->log, "Unkown packet type[0x%08x]", sess->packet_t); return ; } /* support customize APIS for reading packet from bufffer */ if(sess->packet_t == PACKET_T_NULL && sess->pth->sv->packet_reader) { if( (len = sess->pth->sv->packet_reader((const HANDLER *)sess, (const BUFFER *)(sess->buffer)) ) > 0 ) { DEBUG_LOG(sess->log, "Read packet for PACKET_T_NULL length:%d", len); goto end; } else { ERROR_LOG(sess->log, "No packet is read for PACKET_T_NULL"); } return ; } /* handling certain length packet */ if(sess->packet_t == PACKET_T_B ) { if(sess->buffer->size > sess->packet_len) { len = sess->packet_len; DEBUG_LOG(sess->log, "Read packet for PACKET_T_B length:%d", len); goto end; } } /* split packet with ceintain delimiter */ if(sess->packet_t == PACKET_T_C) { p = (char *)sess->buffer->data; end = (char *)sess->buffer->end; i = 0; while(p < end) { if(*p++ == ((char *)sess->delimiter)[i]) i++; else i= 0; if(i == sess->delimiter_len) break; } if((len = (p - (char *)(sess->buffer->data)) ) > 0 ) { goto end; } } return ; end: { if(len > 0 && (packet = buffer_init())) { packet->push(packet, sess->buffer->data, len); if(sess->buffer) sess->buffer->del(sess->buffer, (size_t)len); if(sess->joblist) sess->joblist->push(sess->joblist, packet); sess->push_message(sess, MESSAGE_PACKET); } } return ;}/* push MESSAGE to pthread joblist queue */void sess_push_message(SESSION *sess, int msg_id){ MESSAGE *msg = NULL ; SESSION_CHECK(sess); if(msg_id == MESSAGE_QUIT) sess->transaction_state = CLOSED_STATE; if((msg = message_init()) == NULL) { ERROR_LOG(sess->log, "Initialize MESSAGE failed, %s", strerror(errno)); return ; } else { DEBUG_LOG(sess->log, "Initialize NEW MESSAGE[%d] for SESSION[%d]", msg_id, sess->fd); } msg->msg_id = msg_id; msg->fd = sess->fd; msg->handler = sess; sess->pth->message_queue->push(sess->pth->message_queue, (void *)msg); return ;}/* read chunk from BUFFER */int sess_chunk_reader(SESSION *sess){ int n = 0; CHUNK *cp = NULL; SESSION_CHECK_RET(sess, -1); if(sess->buffer->size <= 0 ) return -1; if((cp = sess->chunk) == NULL) { ERROR_LOG(sess->log, "CHUNK is NULL"); return -1; } if(cp->len <= 0llu ) goto chunk_finish; /* Fill CHUNK with BUFFER */ if((n = cp->fill(cp , sess->buffer->data, sess->buffer->size)) < 0 ) { ERROR_LOG(sess->log, "Filling to CHUNK failed"); return -1; } else { DEBUG_LOG(sess->log, "Filled %d byte(s) to CHUNK ", n); sess->buffer->del(sess->buffer, n); } //CHUNK_VIEW(cp); if(cp->len > 0llu ) { sess->push_message(sess, MESSAGE_DATA); return 0; }chunk_finish : { sess->transaction_state = DATA_HANDLING_STATE; sess->push_message(sess, MESSAGE_DATA); } return 0; }/* add cache to sess->cache */int sess_push_cache(SESSION *sess, void *data, size_t size){ SESSION_CHECK_RET(sess, -1); sess->cache->reset(sess->cache); sess->cache->push(sess->cache, data, size); return 0;}/* add MEM_CHUNK to send queue */int sess_push_chunk(SESSION *sess, void *data, size_t len ){ CHUNK *cp = NULL; SESSION_CHECK_RET(sess, -1); cp = (CHUNK *)(sess->send_queue->tail(sess->send_queue)); if(cp != NULL && cp->type == MEM_CHUNK) { cp->append(cp, data, len); } else { cp = chunk_init(); cp->set(cp, sess->transaction_id, MEM_CHUNK, NULL, 0llu, 0llu); cp->append(cp, data, len); sess->send_queue->push(sess->send_queue, (void *)cp); //CHUNK_VIEW(cp); //QUEUE_VIEW(sess->send_queue); } sess->s_event->add(sess->s_event, E_WRITE); //QUEUE_VIEW(sess->send_queue); return 0;}/* add FILE_CHUNK to send queue */int sess_push_file(SESSION *sess, char *filename, uint64_t offset, uint64_t len){ CHUNK *cp = NULL; SESSION_CHECK_RET(sess, -1); cp = chunk_init(); cp->set(cp, sess->transaction_id, FILE_CHUNK, filename, offset, len); sess->send_queue->push(sess->send_queue, (void *)cp); sess->s_event->add(sess->s_event, E_WRITE); return 0;}/* set SESSION transaction state */int sess_set_transaction_state(SESSION *sess, uint32_t state){ if(sess && (state & TRANSACTION_STATES)) { sess->transaction_state = state; return 0; } return -1;}/* set SESSION transaction state */int sess_set_transaction_id(SESSION *sess, uint32_t transaction_id){ if(sess && transaction_id > 0 ) { sess->transaction_id = transaction_id; return 0; } return -1;}/* read out of band data and handling it */int sess_oob_handler(SESSION *sess){ int buf_size = 1024; char buf[buf_size]; if(sess && sess->pth->sv->oob_handler) { sess->pth->sv->oob_handler((const HANDLER *)sess, (const BUFFER *)(sess->oob)); return 0; } return -1;}/* check connection state send oob data ensure connection is connected */int sess_state_handler(SESSION *sess){ if(sess) { /* if(send(sess->fd, (void *)"0", 1, MSG_OOB) < 0 ) { ERROR_LOG(sess->log, "Sending OOB data failed, %s", strerror(errno)); goto terminate_session; } */ if( (time(NULL) - sess->timer->last_sec) >= sess->pth->sv->conn_timeout ) { ERROR_LOG(sess->log, "Connection TIMEOUT %d seconds", sess->pth->sv->conn_timeout); goto terminate_session; } return 0; terminate_session: { sess->push_message(sess, MESSAGE_QUIT); return -1; } }}/* terminate session */int sess_terminate(SESSION *sess){ if(sess) { sess->transaction_state = CLOSED_STATE; DEBUG_LOG(sess->log, "Terminating connection[%d] %s:%d", sess->fd, sess->ip, sess->port); sess->s_event->destroy(sess->s_event); DEBUG_LOG(sess->log, "Closing EV"); shutdown(sess->fd, SHUT_RDWR); close(sess->fd); sess->pth->sv->running_connections--; } return 0;}/* Clean session */void sess_clean(SESSION **sess){ CHUNK *chunk = NULL; BUFFER *buf = NULL; if((*sess)) { DEBUG_LOG((*sess)->log, "Cleaning connection[%d] %s:%d", (*sess)->fd, (*sess)->ip, (*sess)->port); /* Clean joblist QUEUE */ if((*sess)->joblist) { while((*sess)->joblist->total > 0 ) { buf = (BUFFER *)(*sess)->joblist->pop((*sess)->joblist); if(buf)buf->clean(&buf); } (*sess)->joblist->clean(&((*sess)->joblist)); } /* Clean send QUEUE */ if((*sess)->send_queue) { while((*sess)->send_queue->total > 0 ) { chunk = (CHUNK *)(*sess)->send_queue->pop((*sess)->send_queue); if(chunk)chunk->clean(&chunk); } (*sess)->send_queue->clean(&((*sess)->send_queue)); } /* Clean CHUNK */ if((*sess)->chunk) (*sess)->chunk->clean(&((*sess)->chunk)); /* Clean BUFFER */ if((*sess)->buffer) (*sess)->buffer->clean(&((*sess)->buffer)); /* Clean packet */ if((*sess)->packet) (*sess)->packet->clean(&((*sess)->packet)); /* Clean cache */ if((*sess)->cache) (*sess)->cache->clean(&((*sess)->cache)); /* Clean oob */ if((*sess)->oob) (*sess)->oob->clean(&((*sess)->oob)); /* Clean TIMER */ if((*sess)->timer) (*sess)->timer->clean(&((*sess)->timer)); /* Clean event */ if((*sess)->s_event) (*sess)->s_event->clean(&((*sess)->s_event)); free((*sess)); (*sess) = NULL; } return ;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -