📄 session.c
字号:
#include "session.h"#include "sbase.h"#include "buffer.h"#include "queue.h"/* sendqueue settting *///sess->push_message(sess, MESSAGE_OUTPUT);#define SESSION_CHECK_RET(sess, ret) \{ \ if(sess == NULL)\ { \ _ERROR_LOG("ERROR:SESSION is NULL");\ return ret; \ } \ if(sess->transaction_state == CLOSED_STATE) return ret; \}#define SESSION_CHECK(sess) \{ \ if(sess == NULL)\ { \ _ERROR_LOG("ERROR:SESSION is NULL");\ return ; \ } \ if(sess->transaction_state == CLOSED_STATE) return ;\}/* Initialize SESSION struct */SESSION *session_init(){ SESSION *sess = (SESSION *)calloc(1, sizeof(SESSION)); if(sess == NULL) { _ERROR_LOG("ERROR:calloc new SESSION failed, %s", strerror(errno)); return NULL; } sess->set = sess_set; sess->event_handler = sess_event_handler; sess->read_handler = sess_read_handler; sess->write_handler = sess_write_handler; sess->chunk_reader = sess_chunk_reader; sess->packet_reader = sess_packet_reader; sess->packet_handler = sess_packet_handler; sess->data_handler = sess_data_handler; sess->push_message = sess_push_message; sess->push_cache = sess_push_cache; sess->push_chunk = sess_push_chunk; sess->push_file = sess_push_file; sess->oob_handler = sess_oob_handler; sess->state_handler = sess_state_handler; sess->terminate = sess_terminate; sess->clean = sess_clean; sess->buffer = buffer_init(); sess->cache = buffer_init(); sess->oob = buffer_init(); sess->chunk = chunk_init(); sess->send_queue = queue_init(); sess->joblist = queue_init(); sess->timer = timer_init(); sess->s_event = ev_init(); return sess;}/* initialize session */int sess_set(SESSION *sess, int fd){ socklen_t sa_len; int keep_alive = 1;//设定KeepAlive int keep_idle = 1;//开始首次KeepAlive探测前的TCP空闭时间 int keep_interval = 1;//两次KeepAlive探测间的时间间隔 int keep_count = 3;//判定断开前的KeepAlive探测次数 SESSION_CHECK_RET(sess, -1); sess->packet_t = sess->pth->sv->packet_t; sess->packet_len = sess->pth->sv->packet_len; sess->delimiter = sess->pth->sv->delimiter; sess->delimiter_len = sess->pth->sv->delimiter_len; sess->fd = fd; sess->transaction_id = 0; sess->transaction_state = 0; sess->log = sess->pth->log; sess->buf_size = (sess->pth->sv->buf_size) ? sess->pth->sv->buf_size : BUF_SIZE; /* Get peer name */ //getpeername(sess->fd, (struct sockaddr *)&(sess->sa), &sa_len); //strcpy(sess->ip, inet_ntoa(sess->sa.sin_addr)); //sess->port = ntohs(sess->sa.sin_port); /* Set Non-block */ fcntl(sess->fd, F_SETFL, O_NONBLOCK); /* set keepalive */ setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keep_alive, sizeof(keep_alive)); setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, (void *)&keep_idle,sizeof(keep_idle)); setsockopt(fd,SOL_TCP,TCP_KEEPINTVL, (void *)&keep_interval, sizeof(keep_interval)); setsockopt(fd,SOL_TCP,TCP_KEEPCNT, (void *)&keep_count,sizeof(keep_count)); /* Initialize event */ sess->s_event->set(sess->s_event, sess->fd, E_READ | E_PERSIST, (void *)sess, sess->event_handler); sess->pth->eventbase->add(sess->pth->eventbase, sess->s_event); return 0;}/* SESSION event handler */void sess_event_handler(int event_fd, short event, void *arg){ SESSION *sess = (SESSION *)arg; short flags = event; if(sess) { if(event_fd != sess->fd) { ERROR_LOG(sess->log, "event file descriptor [%d] do not match session fd[%d]", event_fd, sess->fd); return ; } else { DEBUG_LOG(sess->log, "EV_HANDLER:%d", event); } if( flags & E_READ) { DEBUG_LOG(sess->log, "E_READ:%d", E_READ); if(sess->read_handler(sess) != 0) return ; flags ^= E_READ; } if( flags & E_WRITE) { DEBUG_LOG(sess->log, "E_WRITE:%d", E_WRITE); if(sess->write_handler(sess) != 0 ) return ; flags ^= E_WRITE; } if(flags != 0 ) { ERROR_LOG(sess->log, "UNKOWN EV:%d", flags); //sess->pth->terminate_session(sess->pth, sess); sess->push_message(sess, MESSAGE_QUIT); } } return ;}/* read data from fd*/int sess_read_handler(SESSION *sess){ SESSION_CHECK_RET(sess, -1); int len, total = 0; void *tmp = NULL; int fd = -1; if(sess->transaction_state == CLOSED_STATE) return -1; tmp = (void *)calloc(1, sess->buf_size); /* Reading OOB data */ if(len = recv(sess->fd, tmp, sess->buf_size, MSG_OOB) > 0 ) { DEBUG_LOG(sess->log, "Read %d byte(s) OOB data", len); sess->oob_recv_total += len; /* Push data to buffer */ sess->oob->push(sess->oob, tmp, len); /* Update timer */ sess->timer->sample(sess->timer); sess->oob_handler(sess); goto end; } /* Reading normal data */ if((len = read(sess->fd, tmp, sess->buf_size)) <= 0) { ERROR_LOG(sess->log, "Reading from %s:%d failed, %s", sess->ip, sess->port, strerror(errno)); goto err_end; } /* else { if((fd = open("/tmp/put", O_CREAT | O_RDWR |O_APPEND, 0644)) > 0 ) { write(fd, tmp, len); close(fd); } } */ sess->recv_total += len; /* Push data to buffer */ sess->buffer->push(sess->buffer, tmp, len); /* Update event */ DEBUG_LOG(sess->log, "Read %ld byte(s) From %s:%d via fd(%d) BUFFER total:%u", len , sess->ip, sess->port, sess->fd, sess->buffer->size); /* Handling Incomming buffer WITH CHUNK */ if(sess->transaction_state == READ_CHUNK_STATE ) { sess->chunk_reader(sess); goto end; } /* Reading packet */ sess->packet_reader(sess); end: { if(tmp) free(tmp); } return 0; err_end: { if(tmp) free(tmp); //sess->pth->terminate_session(sess->pth, sess); sess->push_message(sess, MESSAGE_QUIT); } return -1;}/* wirite data to fd */int sess_write_handler(SESSION *sess){ int sent = 0; uint64_t len = 0llu; CHUNK *cp = NULL; SESSION_CHECK_RET(sess, -1); /* send CHUNK */ cp = (CHUNK *)(sess->send_queue->head(sess->send_queue)); if(cp == NULL ) goto end; len = cp->len; //CHUNK_VIEW(cp); if((sent = cp->send(cp, sess->fd, sess->buf_size) ) > 0 ) { sess->send_total += sent; DEBUG_LOG(sess->log, "Sent %u of %llu byte(s) to %s:%d via fd[%d]", sent, len, sess->ip, sess->port, sess->fd) if(cp->len <= 0llu ) { cp = (CHUNK *)sess->send_queue->pop(sess->send_queue); DEBUG_LOG(sess->log, "Clean CHUNK[%08x]", cp); if(cp)cp->clean(&cp); } /* Update timer */ sess->timer->sample(sess->timer); } else { ERROR_LOG(sess->log, "Sending chunk to %s:%d failed, %s", sess->ip, sess->port, strerror(errno)); /* ERROR , QUIT */ //sess->pth->terminate_session(sess->pth, sess); sess->push_message(sess, MESSAGE_QUIT); return -1; } end: { if(sess->send_queue->total == 0) sess->s_event->del(sess->s_event, E_WRITE); } return 0;}/* packet handler for hook */void sess_packet_handler(SESSION *sess){ BUFFER *packet = NULL ; int len = 0; SESSION_CHECK(sess); /* Clean Last packet */ if(sess->packet) sess->packet->clean(&(sess->packet)); DEBUG_LOG(sess->log, "sess->packet:%08x", sess->packet); /* Get joblist header pointer */ if(sess->joblist) packet = (BUFFER *) sess->joblist->pop(sess->joblist); if(packet == NULL) return ; DEBUG_LOG(sess->log, "Poped packet[%08x] from joblist leave %d ", packet, sess->joblist->total); /* User Customize handling */ if(sess->pth->sv->packet_handler) { DEBUG_LOG(sess->log, "Handling packet using customize function[0x%08X]", sess->pth->sv->packet_handler); sess->pth->sv->packet_handler((const HANDLER *)sess, (const BUFFER *)packet); DEBUG_LOG(sess->log, "Handled packet:%08x size:%d", packet, packet->size); switch(sess->transaction_state) { case READ_CHUNK_STATE: sess->packet = packet; return ; break; case DATA_HANDLING_STATE: sess->packet = packet; return ; break; default: return packet->clean(&packet); break; } } if(packet) packet->clean(&packet); return ;}/* data handler */void sess_data_handler(SESSION *sess){ SESSION_CHECK(sess); switch(sess->transaction_state) { /* Reading for CHUNK */ case READ_CHUNK_STATE: sess->chunk_reader(sess); break; /* Handling packet and CHUNK */ case DATA_HANDLING_STATE: if(sess->pth->sv->data_handler) { DEBUG_LOG(sess->log, "Handling packet data using customize function[0x%08X]", sess->pth->sv->data_handler); sess->pth->sv->data_handler((const HANDLER *)sess, (const BUFFER *)(sess->packet), (const CHUNK *)(sess->chunk) , (const BUFFER *)(sess->cache) ); DEBUG_LOG(sess->log, "Handled packet:%08x size:%d", sess->packet, sess->packet->size); if(sess->packet) sess->packet->clean(&(sess->packet));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -