📄 px_ls_module_dbes.c
字号:
/* must been first include begin */
#include "..\ProjectX_Common\ProjectX_Copyright.h"
#include "..\ProjectX_Common\ProjectX_Common.h"
/* must been first include end */
/* std and common include */
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <winsock2.h>
//program specify include
#include "..\ProjectX_Common\ProjectX_Shared.h"
#include "..\ProjectX_Common\ProjectX_Utils.h"
#include "..\ProjectX_Common\ProjectX_MessageCode.h"
#include "px_ls_common.h"
#include "px_ls_config.h"
#include "px_ls_state.h"
#include "px_ls_statistic.h"
#include "px_ls_time_service.h"
#include "px_ls_ui.h"
#include "px_ls_module_dbes.h"
#include "px_ls_account.h"
HANDLE h_event_dbes = NULL; /* 连接到dbes的线程信号 */
HANDLE h_iocp_module_dbes = NULL;
SOCKET sck_module_dbes = NULL;
SOCKADDR_IN addr_module_dbes;
PX_LS_DBES * module_dbes = NULL;
PX_LS_DBES_PER_DATA * module_dbes_per_data = NULL;
PX_LS_DBES_IO_DATA * module_dbes_io_recv = NULL;
PX_LS_DBES_IO_DATA * module_dbes_io_send = NULL;
extern PX_LS_CONFIG * g_config; /* LS服务器配置信息 */
extern PX_LS_IPBAN * g_ip_ban_head; /* ip访问控制 */
extern PX_LS_STATISTIC * g_stat; /* 统计信息 */
extern PX_LS_TIME_SERVICE * g_time_service; /* 服务器时间服务 */
extern BOOL bln_run_workthread; /* 是否继续运行工作者线程 */
extern HANDLE h_winsock2; /* winsock2 句柄 */
char base_path[FILENAME_MAX]; /* 程序基文件地址 */
char ** pcfg_buf; /* 配置缓存 */
extern PX_LS_STATE * g_state; /* 服务器全局状态 */
extern HASH_TABLE_RS * g_hash_table_rs; //指向rs模块用来快速搜索hash表
extern HASH_TABLE_DBES * g_hash_table_dbes; //指向dbes模块用来快速搜索hash
extern ACCOUNT_KEY * account_key_head; /* 帐号 */
extern int pxmc[PROJECTX_MESSAGECODE_FINAL];
DWORD WINAPI module_dbes_connect_thread(PVOID paparm){
int n = 2;
char buf[2];
int iret;
int trys = 3;
WORD mc = MCLS_DBES_REQUEST_CONNECT;
BOOL bln_conn = FALSE;
PX_LS_DBES * pdbes;
DWORD bytes_io;
DWORD flags;
WaitForSingleObject(h_event_dbes,INFINITE);
if (!valid_ls_state(STATE_MAIN_RUN)) {
return 0;
}
while (trys > 0) {
iret = connect(sck_module_dbes,(SOCKADDR *)&addr_module_dbes,sizeof(addr_module_dbes));
if (iret == SOCKET_ERROR) {
trys--;
Sleep(1000);
}else{
bln_conn = TRUE;
break;
}
}
if (!bln_conn) {
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
return 0;
}else{
set_module_dbes_state(STATE_MODULE_RUN);
}
//prepare per_data
module_dbes_per_data = (LPPX_LS_DBES_PER_DATA)ms_malloc(sizeof(PX_LS_DBES_PER_DATA));
module_dbes_per_data->pmodule_dbes = (LPPX_LS_DBES)ms_malloc(sizeof(PX_LS_DBES));
pdbes = module_dbes_per_data->pmodule_dbes;
memset(pdbes,0,sizeof(PX_LS_DBES));
pdbes->cts_state = ms_malloc(sizeof(CRITICAL_SECTION));
memset(pdbes->cts_state,0,sizeof(CRITICAL_SECTION));
InitializeCriticalSectionAndSpinCount(pdbes->cts_state,0x4000);
pdbes->cts_recv = ms_malloc(sizeof(CRITICAL_SECTION));
memset(pdbes->cts_recv,0,sizeof(CRITICAL_SECTION));
InitializeCriticalSectionAndSpinCount(pdbes->cts_recv,0x4000);
pdbes->cts_send = ms_malloc(sizeof(CRITICAL_SECTION));
memset(pdbes->cts_send,0,sizeof(CRITICAL_SECTION));
InitializeCriticalSectionAndSpinCount(pdbes->cts_send,0x4000);
pdbes->send_sign = IOCP_IO_TYPE_WAIT_SEND_BUF;
pdbes->last_handled_frame = g_time_service->curr_frame_theory;
pdbes->recv_buf = (char *)ms_malloc(MODULE_DBES_RECV_BUF_LEN*sizeof(char));
pdbes->precv_append = NULL;
pdbes->precv_end = pdbes->recv_buf;
pdbes->precv_start = pdbes->recv_buf;
pdbes->send_buf = (char *)ms_malloc(MODULE_DBES_SEND_BUF_LEN*sizeof(char));
pdbes->psend_append = NULL;
pdbes->psend_start = pdbes->send_buf;
pdbes->psend_end = pdbes->send_buf;
//prepare io_recv
module_dbes_io_recv = (LPPX_LS_DBES_IO_DATA)ms_malloc(sizeof(PX_LS_DBES_IO_DATA));
memset(module_dbes_io_recv,0,sizeof(OVERLAPPED));
module_dbes_io_recv->buf = (char *)ms_malloc(MODULE_DBES_IOCP_BUF_LEN*sizeof(char));
module_dbes_io_recv->size = MODULE_DBES_IOCP_BUF_LEN;
module_dbes_io_recv->io_type = IOCP_IO_TYPE_RECV;
module_dbes_io_recv->wsabuf.buf = module_dbes_io_recv->buf;
module_dbes_io_recv->wsabuf.len = MODULE_DBES_IOCP_BUF_LEN;
//prepare io_send
module_dbes_io_send = (LPPX_LS_DBES_IO_DATA)ms_malloc(sizeof(PX_LS_DBES_IO_DATA));
memset(module_dbes_io_send,0,sizeof(OVERLAPPED));
module_dbes_io_send->buf = (char *)ms_malloc(MODULE_DBES_IOCP_BUF_LEN*sizeof(char));
module_dbes_io_send->size = MODULE_DBES_IOCP_BUF_LEN;
module_dbes_io_send->io_type = IOCP_IO_TYPE_WAIT_SEND_BUF;
module_dbes_io_send->wsabuf.buf = module_dbes_io_send->buf;
module_dbes_io_send->wsabuf.len = MODULE_DBES_IOCP_BUF_LEN;
//transfer first WSASend with connect request use mc = MCLS_DBES_REQUEST_CONNECT
memcpy(buf,&mc,sizeof(WORD));
add_buf_to_module_dbes_send_buf(n,buf);
iret = get_module_dbes_send_buf(module_dbes_io_send->buf);
flags = 0;
iret = WSASend(sck_module_dbes,&(module_dbes_io_send->wsabuf),1,&bytes_io,&flags,&(module_dbes_io_send->overlapped),NULL);
if ((iret == 0)||((iret == SOCKET_ERROR)&&(WSAGetLastError() == WSA_IO_PENDING))){
//succ
}else{
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
return 0;
}
//transfer first WSARecv
flags = 0;
iret = WSARecv(sck_module_dbes,&(module_dbes_io_recv->wsabuf),1,&bytes_io,&flags,&(module_dbes_io_recv->overlapped),NULL);
if ((iret == 0)||((iret == SOCKET_ERROR)&&(WSAGetLastError() == WSA_IO_PENDING))){
//succ
}else{
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
return 0;
}
return 0;
}
DWORD WINAPI module_dbes_iocp_thread(PVOID paparm){
HANDLE iocp = (HANDLE)paparm;
DWORD bytes_trans;
LPOVERLAPPED overlapped;
LPPX_LS_DBES_PER_DATA per_data;
LPPX_LS_DBES_IO_DATA io_data;
DWORD send_bytes;
DWORD recv_bytes;
DWORD flags;
int iret;
char cache[FILENAME_MAX] = "";
BOOL succ = FALSE;
while (TRUE) {
iret = GetQueuedCompletionStatus(iocp,&bytes_trans,(LPDWORD)&per_data,(LPOVERLAPPED *)&io_data,INFINITE);
overlapped = &(io_data->overlapped);
if ((iret == 0)&&(overlapped == NULL)) {
x_debug_info("module dbes got fault error,need close now");
g_stat->num_thread_dbes--;
if (g_stat->num_thread_dbes <= 0) {
g_stat->num_thread_dbes = 0;
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
}
return 0;
}else if ((iret == 0)&&(overlapped != NULL)) {
close_module_dbes();
return 0;
}else if ((iret != 0)&&(overlapped != NULL)) {
if (bytes_trans == 0) {
close_module_dbes();
return 0;
}
switch(io_data->io_type) {
case IOCP_IO_TYPE_RECV:
add_buf_to_module_dbes_recv_buf(bytes_trans,io_data->buf);
memset(io_data,0,sizeof(OVERLAPPED));
io_data->io_type = IOCP_IO_TYPE_RECV;
io_data->size = MODULE_DBES_IOCP_BUF_LEN;
io_data->wsabuf.buf = io_data->buf;
io_data->wsabuf.len = MODULE_DBES_IOCP_BUF_LEN;
flags = 0;
iret = WSARecv(sck_module_dbes,&(io_data->wsabuf),1,&recv_bytes,&flags,&(io_data->overlapped),NULL);
if ((iret == 0)||((iret == SOCKET_ERROR)&&(WSAGetLastError() == WSA_IO_PENDING))){
//succ
}else{
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
return 0;
}
break;
case IOCP_IO_TYPE_SEND:
update_module_dbes_send_buf(bytes_trans);
memset(io_data,0,sizeof(OVERLAPPED));
iret = get_module_dbes_send_buf(io_data->buf);
if (iret == 0) {
continue;
}else{
io_data->size = iret;
io_data->wsabuf.buf = io_data->buf;
io_data->wsabuf.len = iret;
io_data->io_type = IOCP_IO_TYPE_SEND;
flags = 0;
iret = WSASend(sck_module_dbes,&(io_data->wsabuf),1,&send_bytes,&flags,&(io_data->overlapped),NULL);
if ((iret == 0)||((iret == SOCKET_ERROR)&&(WSAGetLastError() == WSA_IO_PENDING))){
//succ
}else{
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
return 0;
}
}
break;
case IOCP_IO_TYPE_POST_SPACE_SEND:
memset(io_data,0,sizeof(OVERLAPPED));
iret = get_module_dbes_send_buf(io_data->buf);
if (iret == 0) {
continue;
}else{
io_data->size = iret;
io_data->wsabuf.buf = io_data->buf;
io_data->wsabuf.len = iret;
io_data->io_type = IOCP_IO_TYPE_SEND;
flags = 0;
iret = WSASend(sck_module_dbes,&(io_data->wsabuf),1,&send_bytes,&flags,&(io_data->overlapped),NULL);
if ((iret == 0)||((iret == SOCKET_ERROR)&&(WSAGetLastError() == WSA_IO_PENDING))){
//succ
}else{
set_module_dbes_state(STATE_MODULE_ERROR_FOUND);
return 0;
}
}
break;
case IOCP_IO_TYPE_POST_QUIT_THREAD:
g_stat->num_thread_dbes--;
if (g_stat->num_thread_dbes > 0) {
memset(io_data,0,sizeof(OVERLAPPED));
io_data->io_type = IOCP_IO_TYPE_POST_QUIT_THREAD;
bytes_trans = 1;
PostQueuedCompletionStatus(iocp,&bytes_trans,per_data,&(io_data->overlapped));
}else{
g_stat->num_thread_dbes = 0;
ms_free(per_data);
per_data = NULL;
ms_free(io_data);
io_data = NULL;
return 0;
}
break;
default:
break;
}
}
}
return 0;
}
void init_module_dbes(){
if (!valid_ls_state(STATE_MAIN_REQUEST_INIT)) {
return;
}
sck_module_dbes = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
memset(&addr_module_dbes,0,sizeof(addr_module_dbes));
addr_module_dbes.sin_family = AF_INET;
addr_module_dbes.sin_addr.s_addr = inet_addr(g_config->ip_to_dbes);
addr_module_dbes.sin_port = htons(g_config->port_to_dbes);
h_event_dbes = CreateEvent(NULL,TRUE,FALSE,NULL);
h_iocp_module_dbes = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,2);
_beginthread(module_dbes_connect_thread,0,h_iocp_module_dbes);
set_module_dbes_state(STATE_MODULE_INIT_DONE);
return;
}
void set_module_dbes_state(byte state){
EnterCriticalSection(g_state->cts_state);
g_state->module_dbes_state = state;
g_state->frame_module_dbes = g_time_service->curr_frame_theory;
LeaveCriticalSection(g_state->cts_state);
return;
}
BOOL valid_module_dbes_state(byte state){
BOOL ret = FALSE;
EnterCriticalSection(g_state->cts_state);
if (g_state->module_dbes_state == state) {
ret = TRUE;
}
LeaveCriticalSection(g_state->cts_state);
return ret;
}
void set_module_dbes_send_state(byte state){
EnterCriticalSection(module_dbes->cts_send);
module_dbes->send_sign = state;
LeaveCriticalSection(module_dbes->cts_send);
return;
}
void close_module_dbes(){
PX_LS_DBES_PER_DATA * per_data;
PX_LS_DBES_IO_DATA * io_data;
DWORD bytes_trans = 1;
SetEvent(h_event_dbes);
if (sck_module_dbes != NULL) {
shutdown(sck_module_dbes,SD_BOTH);
closesocket(sck_module_dbes);
sck_module_dbes = NULL;
}
if (g_stat->num_thread_dbes > 0) {
per_data = (LPPX_LS_DBES_PER_DATA)ms_malloc(sizeof(PX_LS_DBES_PER_DATA));
per_data->pmodule_dbes = NULL;
io_data = (LPPX_LS_DBES_IO_DATA)ms_malloc(sizeof(PX_LS_DBES_IO_DATA));
memset(io_data,0,sizeof(OVERLAPPED));
io_data->io_type = IOCP_IO_TYPE_POST_QUIT_THREAD;
PostQueuedCompletionStatus(h_iocp_module_dbes,&bytes_trans,(DWORD)per_data,&(io_data->overlapped));
}
return;
}
void free_module_dbes_resource(){
PX_LS_DBES * pdbes;
if (sck_module_dbes != NULL) {
shutdown(sck_module_dbes,SD_BOTH);
closesocket(sck_module_dbes);
sck_module_dbes = NULL;
}
memset(&addr_module_dbes,0,sizeof(addr_module_dbes));
h_iocp_module_dbes = NULL;
if (h_event_dbes != NULL) {
CloseHandle(h_event_dbes);
h_event_dbes = NULL;
}
if (module_dbes_io_recv != NULL) {
if (module_dbes_io_recv->buf != NULL) {
ms_free(module_dbes_io_recv->buf);
module_dbes_io_recv->buf = NULL;
}
ms_free(module_dbes_io_recv);
module_dbes_io_recv = NULL;
}
if (module_dbes_io_send != NULL) {
if (module_dbes_io_send->buf != NULL) {
ms_free(module_dbes_io_send->buf);
module_dbes_io_send->buf = NULL;
}
ms_free(module_dbes_io_send);
module_dbes_io_send = NULL;
}
if (module_dbes_per_data != NULL) {
pdbes = module_dbes_per_data->pmodule_dbes;
DeleteCriticalSection(pdbes->cts_state);
DeleteCriticalSection(pdbes->cts_recv);
DeleteCriticalSection(pdbes->cts_send);
ms_free(pdbes->cts_state);
pdbes->cts_state = NULL;
ms_free(pdbes->cts_recv);
pdbes->cts_recv = NULL;
ms_free(pdbes->cts_send);
pdbes->cts_send = NULL;
if (pdbes->recv_buf != NULL) {
ms_free(pdbes->recv_buf);
pdbes->recv_buf = NULL;
}
if (pdbes->send_buf != NULL) {
ms_free(pdbes->send_buf);
pdbes->send_buf = NULL;
}
if (pdbes->precv_append != NULL) {
ms_free(pdbes->precv_append);
pdbes->precv_append = NULL;
}
if (pdbes->psend_append != NULL) {
ms_free(pdbes->psend_append);
pdbes->psend_append = NULL;
}
ms_free(pdbes);
pdbes = NULL;
ms_free(module_dbes_per_data);
module_dbes_per_data = NULL;
}
g_stat->num_thread_dbes = 0; // FIXME!
return;
}
void add_buf_to_module_dbes_recv_buf(int size,char * packet){
/*
* 拷贝数据至dbes的接收缓冲,若recv_buf能容纳,数据拷入,
* 若不能容纳,不能容纳不分拷入recv_append,同时更新size_recv_append
*/
return;
}
void add_buf_to_module_dbes_send_buf(int size,char * packet){
/*
* 往dbes发送缓冲拷入数据,若recv_buf能容纳,数据存入recv_buf,
* 若不能容纳,不能容纳部分数据拷贝至precv_append,同时更新其size_send_append值
*/
return;
}
void update_module_dbes_send_buf(int size){
/*
* 置dbes的发送缓冲psend_start 为 psend_start + size
*/
return;
}
int get_module_dbes_send_buf(char * packet){
/*
* 将dbes的发送缓冲内数据拷贝至packet,其最大可拷贝n = MODULE_DBES_IOCP_BUF_LEN
* 若当前可拷贝字符n == 0,则module_dbes->send_sigh = IOCP_IO_TYPE_WAIT_SEND_BUF
*/
return 0;
}
void tidy_module_dbes_recv_buf(){
/*
* 用来整理dbes的接收缓冲,仅在其recv_buf无法容纳时,才准予使用precv_append
*/
return;
}
void tidy_module_dbes_send_buf(){
/*
* 用来整理dbes的发送缓冲,即尽量使其附加发送缓冲置于send_buf中,仅当其send_buf无法容纳时,才准予使用
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -