📄 thread.c
字号:
#include "thread.h"#include "session.h"#include "buffer.h"#include "queue.h"#include "timer.h"#define THREAD_CHECK_RET(pth, ret)\{ \ if(pth == NULL) \ { \ _ERROR_LOG("FATAL:Thread is NULL"); \ return ret; \ } \}#define THREAD_CHECK(pth)\{ \ if(pth == NULL)\ { \ _ERROR_LOG("FATAL:Thread is NULL");\ return;\ }\}/* initialize THREAD struct */THREAD *thread_init(){ THREAD *pth = (THREAD *)calloc(1, sizeof(THREAD)); if(pth == NULL) { _ERROR_LOG("ERROR:Calloc new Thread failed, %s", strerror(errno)); return NULL; } pth->event_handler = pth_event_handler; pth->run = pth_run; pth->addconn = pth_addconn; pth->add_session = pth_add_session; pth->terminate_session = pth_terminate_session; pth->state_conns = pth_state_conns; pth->terminate = pth_terminate; pth->clean = pth_clean; pth->running_status = 1; pth->message_queue = queue_init(); //pth->eventbase = (struct event_base *)ev_init(); pth->eventbase = evbase_init(); pth->timer = timer_init(); pthread_mutex_init(&pth->mutex, NULL); return pth;}/* THREAD event handler */void pth_event_handler(int event_fd, short event, void *arg){ SESSION *sess = NULL; short flags = event; THREAD *pth = (THREAD *)arg; if(pth && pth->sessions && (sess = pth->sessions[event_fd]) ) { if(event_fd != sess->fd) { ERROR_LOG(pth->log, "event file descriptor [%d] do not match session fd[%d]", event_fd, sess->fd); return ; } else { DEBUG_LOG(sess->log, "EV_HANDLER:%d", event); } if( flags & E_READ) { DEBUG_LOG(pth->log, "E_READ:%d", E_READ); if(sess->read_handler(sess) != 0) return ; flags ^= E_READ; } if( flags & E_WRITE) { DEBUG_LOG(pth->log, "E_WRITE:%d", E_WRITE); if(sess->write_handler(sess) != 0 ) return ; flags ^= E_WRITE; } if(flags != 0 ) { ERROR_LOG(pth->log, "UNKOWN EV:%d", flags); pth->terminate_session(pth, sess); } } return ;}/* Running thread */void *pth_run(void *arg){ MESSAGE *msg; THREAD *pth = (THREAD *)arg; SESSION *sess = NULL; uint64_t n = 0; THREAD_CHECK_RET(pth, NULL); /* Running */ pth->thread_id = pthread_self(); while(pth->running_status) { /* Check connection state */ if(pth->timer) { if((time(NULL) - pth->timer->last_sec) >= pth->sv->conn_timeout ) { DEBUG_LOG(pth->log, "Thread[%08x] Heartbeat %d", pth->thread_id, ++n); pth->state_conns(pth); pth->timer->sample(pth->timer); } } /* Event Loop */ //event_base_loop(pth->eventbase, EVLOOP_ONCE | EVLOOP_NONBLOCK); pth->eventbase->loop(pth->eventbase, 0, NULL); usleep(pth->sv->sleep_usec); /* Message Queue */ msg = (MESSAGE *)(pth->message_queue->pop(pth->message_queue)); if(msg) { DEBUG_LOG(pth->log, "Handling message[%08x] ID[%d]", msg, msg->msg_id); sess = (SESSION *)msg->handler; if(sess && msg->msg_id != MESSAGE_NEW_SESSION && msg->handler != pth->sessions[msg->fd]) goto next; switch(msg->msg_id) { /* NEW connection */ case MESSAGE_NEW_SESSION : if(msg->handler) { pth->add_session(pth, msg->fd, (*(struct sockaddr_in *)msg->handler)); free(msg->handler); } break; /* Close connection */ case MESSAGE_QUIT : if(pth->sessions[msg->fd]) pth->terminate_session(pth, sess); break; case MESSAGE_INPUT : break; case MESSAGE_OUTPUT : if(sess)sess->write_handler(sess); break; case MESSAGE_PACKET : if(sess)sess->packet_handler(sess); break; case MESSAGE_DATA : if(sess)sess->data_handler(sess); break; default: break; } next: msg->clean(&msg); } usleep(pth->sv->sleep_usec); } pthread_exit(NULL); return NULL;}/* add new connection to thread */int pth_addconn(THREAD *pth, int fd, struct sockaddr_in sa){ MESSAGE *msg = NULL; THREAD_CHECK_RET(pth, -1); msg = message_init(); if(msg) { DEBUG_LOG(pth->log, "Initialize message[MESSAGE_NEW_SESSION]"); msg->msg_id = MESSAGE_NEW_SESSION; msg->fd = fd; msg->handler = calloc(1, sizeof(struct sockaddr_in)); memcpy(msg->handler, &sa, sizeof(struct sockaddr_in)); pth->message_queue->push(pth->message_queue, (void *)msg); return 0; } return -1;}/* check connection stats */void pth_state_conns(THREAD *pth){ SESSION *sess = NULL; int i = 0; THREAD_CHECK(pth); /* stop and free sessions */ if(pth->sessions) { DEBUG_LOG(pth->log, "Checking connections state"); for(i = 0; i < pth->sv->max_connections; i++) { if((sess = pth->sessions[i]) != NULL) sess->state_handler(sess); } }}/* add new session to thread */int pth_add_session(THREAD *pth, int fd, struct sockaddr_in sa){ SESSION *sess = NULL; THREAD_CHECK_RET(pth, -1); /* Check sessions and Initialize */ if(pth->sessions == NULL) { pth->sessions = (SESSION **)calloc(pth->sv->max_connections, sizeof(SESSION *)); } /* Initialize new session */ if(pth->sessions) { if(pth->sessions[fd]) { DEBUG_LOG(pth->log, "SESSION[%d] is exists", fd); pth->terminate_session(pth, pth->sessions[fd]); } DEBUG_LOG(pth->log, "adding NEW SESSION[%d]", fd); if( (sess = session_init()) != NULL ) { pth->sessions[fd] = sess; memcpy(&(sess->sa), &sa, sizeof(struct sockaddr_in)); strcpy(sess->ip, inet_ntoa(sa.sin_addr)); sess->port = ntohs(sa.sin_port); /* base setting */ sess->pth = pth; if(sess->set(sess, fd) != 0 ) { FATAL_LOG(pth->log, "Initialize new session[%d] failed"); pth->terminate_session(pth, sess); return -1; } } else { ERROR_LOG(pth->log, "Initialize new session failed, %s", strerror(errno)); return -1; } } else { ERROR_LOG(pth->log, "Initialize sessions failed, %s", strerror(errno)); } DEBUG_LOG(pth->log, "added NEW SESSION[%d]", fd); return 0;}/* Terminate session */void pth_terminate_session(THREAD *pth, SESSION *sess){ THREAD_CHECK(pth); if(sess && sess->fd < pth->sv->max_connections ) { pth->sessions[sess->fd] = NULL; sess->terminate(sess); sess->clean(&sess); }}/* Terminate threads */void pth_terminate(THREAD *pth){ int i = 0; THREAD_CHECK(pth); pth->running_status = 0; /* Terminate sessions */ for(i = 0; i < pth->sv->max_connections; i++) { if(pth->sessions[i] != NULL) pth->sessions[i]->terminate(pth->sessions[i]); }}/* clean THREAD child STRUCT */void pth_clean(THREAD **pth){ MESSAGE *msg; int i = 0; THREAD_CHECK((*pth)); /* Clean sessions */ if((*pth)->sessions) { for(i = 0; i < (*pth)->sv->max_connections; i++) { if((*pth)->sessions[i] != NULL) (*pth)->sessions[i]->clean(&((*pth)->sessions[i])); } free((*pth)->sessions); } /* Clean message_queue */ if((*pth)->message_queue) { while((*pth)->message_queue->total > 0 ) { msg = (MESSAGE *)(*pth)->message_queue->pop((*pth)->message_queue); if(msg) msg->clean(&msg); } (*pth)->message_queue->clean(&((*pth)->message_queue)); } /* Clean event base */ if((*pth)->eventbase) (*pth)->eventbase->clean(&((*pth)->eventbase)); /* Clean Timer */ if((*pth)->timer) (*pth)->timer->clean(&((*pth)->timer)); return ;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -