cmppapi.c
来自「用c/c++实现的一个CMPP API」· C语言 代码 · 共 2,167 行 · 第 1/5 页
C
2,167 行
return API_OK; }/* *重新连接MTBS服务器 */static void vReConnect( CONNFD FD ){ int nRet; recCmppConnect rCmppConnect; recCmppConnectResp rCmppConnectResp; memset( &rCmppConnect, 0, sizeof( recCmppConnect ) ); memset( &rCmppConnectResp, 0, sizeof( recCmppConnectResp ) ); g_HConnDesc[FD].eLoginStatus = ReLogining; g_HConnDesc[FD].eSocketStatus = ConnectClose; /* * close */ nTcpClose_Api( g_HConnDesc[FD].nConnFd ); nRet = nTcpConnect_Api( g_HConnDesc[FD].rCfgPara.sHostAddr, g_HConnDesc[FD].rCfgPara.nHostPort ); if( nRet == TCPLIB_FAIL ) { vSetErrorDetail("Error when ReConnect Server, IP[%s] PORT[%d]\n", g_HConnDesc[FD].rCfgPara.sHostAddr, g_HConnDesc[FD].rCfgPara.nHostPort ); g_HConnDesc[FD].eLoginStatus = ReLoginFail; return ; } else { g_HConnDesc[FD].nConnFd = nRet; g_HConnDesc[FD].eSocketStatus = ConnectOpen; } /* *通过配置得到connect的包内容 */ nGetConnectDataByCfg( &g_HConnDesc[FD].rCfgPara, &rCmppConnect ); /* *登陆MTBS服务器 */ nRet = nLogin( FD, &rCmppConnect, &rCmppConnectResp, TRUE ); if( nRet != API_OK ) { vSetErrorDetail("Error when Login Server\n"); Trace( "Error when Login Server\n" ); g_HConnDesc[FD].eLoginStatus = ReLoginFail; return ; } if( rCmppConnectResp.unStatus!= 0 ) { vSetErrorDetail("Connect failed, Response Status is [%d]\n", rCmppConnectResp.unStatus ); Trace( "Connect failed, Response Status is [%d]\n", rCmppConnectResp.unStatus ); g_HConnDesc[FD].eLoginStatus = ReLoginFail; return ; } /* *reconnect ok */ Trace( "Reconnect OK\n" ); g_HConnDesc[FD].eLoginStatus = Logined; return ;}/* * 系统内部发送线程 * (1) 接收SENDBUFFER内容 * (2) 发送到SOCKET,如果SOCKET出错误, 重新建立连接 * (3) 如果超时没有消息包,主动发送ACTIVETEST包 */#define ACTIVE_TIMEVAL 30 // 60秒主动发送ACTIVETEST void* vSendThread( void* arg ){ CONNFD connDesc; recCmppPck rCmppPck; int nRet; int nActiveTest; int nNum = 0; // 用于统计多少次没有包 memcpy( &connDesc, arg, sizeof( int ) ); while( 1 ) { if( g_HConnDesc[ connDesc ].bThreadExit == TRUE ) // 被通知退出 { Trace( "SendThread: Exit Send Thread for notify\n" ); return NULL; } if( g_HConnDesc[connDesc].eLoginStatus == ReLoginRequest ) { Trace( "SendThread: Send Thread Reconnect\n" ); vReConnect( connDesc ); //modify lijian 2003-12-21 优化重新连接流程 if( g_HConnDesc[connDesc].eSocketStatus != ConnectOpen ) { Trace("SendThread: socket cannot open\n"); sleep(1); continue; } } nRet = nLockReadQueue( &g_HConnDesc[ connDesc ].rSendBufFd, &rCmppPck, sizeof( recCmppPck ), 2 ); if( nRet == QUEUE_FAIL ) { Trace( "SendThread: ReadQueue Failed\n" ); continue; } if( nRet == QUEUE_EMPTY ) { nNum++ ; if( nNum < g_HConnDesc[ connDesc ].rCfgPara.nActiveTimeval) { //Trace( "SendThread: TimeOut for [%d] times no cell\n", nNum ); } else { nNum = 0; //主动发送ACTIVETEST //收到OK的RESP后继续流程 Trace( "SendThread: TimeOut, Send a Active " ); nActiveTest = nSendActive( connDesc, TRUE ); //如果失败发起重新连接 if( nActiveTest != API_OK && g_HConnDesc[ connDesc ].bThreadExit != TRUE ) { g_HConnDesc[connDesc].eLoginStatus = ReLoginRequest; Trace("SendThread: Send Active Failed , ReLoginRequest\n" ); continue; } } usleep(100); continue; } if( nRet == QUEUE_OK ) { nNum = 0; } /* *发送到SOCKET */ nRet = nCmppSendSocket( g_HConnDesc[ connDesc ].nConnFd, (void*)&rCmppPck ); //如果失败发起重新连接 if( nRet != API_OK && nRet == API_ERR_NETWORK && g_HConnDesc[ connDesc ].bThreadExit != TRUE ) { g_HConnDesc[connDesc].eLoginStatus = ReLoginRequest; Trace("SendThread: nCmppSendSocket Failed, ReLoginRequest\n" ); continue; } if( nRet != API_OK ) { Trace("Error: nCmppSendSocket failed, lose one package\n" ); } } //end of while(1) return ( NULL );}/* * 从socket接收单个包 * Para: * nSock : socket句柄 * pBuf : 接收缓冲,调用者负责分配空间 * nTimeout: 超时 * Return: 接收包的大小, <0表示失败 */static int nMSgRecv( int nSock, char* pBuf, int nTimeout ){ int nRet; int nTotalLen; int nTmp; if ( pBuf == NULL ) { return -1; } nRet = nTcpReadn_Api( nSock, pBuf, MacIntLen, nTimeout ); if( nRet < 0 ) { Trace ("nTcpRecv Msg error"); return -1; } if( nRet == 0 ) { return 0; } memcpy(&nTmp, pBuf, MacIntLen); nTotalLen = ntohl(nTmp); if( nTotalLen < MacHeadLen ) { Trace ("Receive Length not correct len = %d\n", nTotalLen ); return -1; } nRet = nTcpReadn_Api(nSock, pBuf+MacIntLen, nTotalLen - MacIntLen , nTimeout); if(nRet < 0) { Trace("nTcpRecv Msg Body error"); return -1; } if(nRet == 0) { Trace("nTcpRecv Msg Body timeout"); return -1; } nRet = nTotalLen; return nRet; }#define RECV_TIMEOUT 60 /* 接收包超时时长 *//* * 系统内部接收线程 * (1) 阻塞的设置超时(60秒)的接收SOCKET数据。直到接收到一个数据包 * (2) 调用协议解析函数,得到消息内容 * (3) 根据不同消息包做对应处理( 目前 有SUBMITRESP,DELIVER和 * ACTIVETEST, ACTIVETESTRESP) * (4) ACTIVETEST 回给RESP 放入到SENDBUFFER, 将链路设置为OK * (5) SUBMITRESP将sequence放入到匹配SEUQNCE的HASH, * 事件触发用户发送线程, * (用户发送线程触发后,匹配到SEQUENCE后,取出resp数据, * 验证数据,做处理 ,如果没有匹配到SEQUENCE,再等待事件) */ void* vRecvThread( void* arg ){ int nRet, nLen; char RecvBuff[1024]; recCmppPck rCmppPackage; recCmppDeliverResp rCmppDeliverResp; recSeqCell rSeqCell; CONNFD FD; recConnDesc *precConn ; memcpy( &FD, arg, sizeof( int ) ); FD = *( (int*)arg ); precConn = (recConnDesc *)(&g_HConnDesc[ FD ]); while( 1 ) { if( precConn->bThreadExit == TRUE ) // 被通知退出 { Trace( "RecvThread: Exit Recv Thread for notify\n" ); return NULL; } if ( precConn->eSocketStatus != ConnectOpen ) { Trace( "RecvThread: Socket not Open, sleep 1 sec and try again\n" ); //Trace( "RecvThread: bThreadExit = %d\n", precConn->bThreadExit ); sleep(1); continue; } memset(RecvBuff, 0, sizeof(RecvBuff)); nRet = nMSgRecv( precConn->nConnFd, RecvBuff, RECV_TIMEOUT ); if ( nRet == 0 ) { Trace("RecvThread: nRet = 0"); continue; } else if ( nRet < 0 && precConn->bThreadExit != TRUE ) { /* *错误处理 *触发发送线程重新连接 */ precConn->eLoginStatus = ReLoginRequest; rCmppPackage.rHead.unCommandId = MacC_ActiveRqst; rCmppPackage.rHead.unSequenceId = 99999999; rCmppPackage.rHead.unTotalLength = MacHeadLen + MacActiveTestLen ; nLockWriteQueue( &precConn->rSendBufFd, &rCmppPackage, sizeof( recCmppPck ) ); Trace("RecvThread: nMsgRecv nRet <0, ReLoginRqeuset And Send A Active"); continue; } nLen = nRet; nRet = nCmppDecode( RecvBuff, nLen, &rCmppPackage ); if ( nRet < 0 ) { Trace( "RecvThread: nCmppDecode Failed error=%d\n", nRet); /*错误处理*/ continue; } Trace( "RecvThread: Recv a package From outSide, " "commandId = %x, sequence = %d\n", rCmppPackage.rHead.unCommandId, rCmppPackage.rHead.unSequenceId ); switch ( rCmppPackage.rHead.unCommandId ) { case MacC_ActiveRep: case MacC_SubmitRep: case MacC_ConnectRep: { //Trace( " bef search Sequence packet in HT, nSeq = %d\n", // rCmppPackage.rHead.unSequenceId ); nRet = nLockSerACell( &precConn->rSeqBufFd, (void*)&rCmppPackage.rHead.unSequenceId, sizeof(int), (void*) &rSeqCell, sizeof(recSeqCell) ); if ( nRet == SHMHT_OK ) { Trace( "RecvThread: success search Sequence packet in HT, nSeq = %d\n", rCmppPackage.rHead.unSequenceId ); memcpy( &rSeqCell.rCmppPck, &rCmppPackage, sizeof(recCmppPck) ); nRet = nLockUpdateACell( &precConn->rSeqBufFd, (void*)&rCmppPackage.rHead.unSequenceId, sizeof(int), (void*) &rSeqCell, sizeof(recSeqCell) ); Trace( "RecvThread: update Sequence packet in HT, nSeq = %d, nRet = %d\n", rCmppPackage.rHead.unSequenceId, nRet ); //Trace( " bef set All event, rSeqCell.prEvent = %x, bSended = %d, nSeq=%d", // rSeqCell.prEvent, *rSeqCell.prEvent->pbSended, // rCmppPackage.rHead.unSequenceId ); nSetEvent( rSeqCell.prEvent); //Trace( " aft set All event ,rSeqCell.prEvent = %x, bSended = %d", // rSeqCell.prEvent, *rSeqCell.prEvent->pbSended ); } else { Trace( "RecvThread: no search Sequence packet in HT, seq=[%d], nRet = %d\n", rCmppPackage.rHead.unSequenceId, nRet ); } break; } case MacC_DeliverRqst: { nRet = nLockWriteQueue( &precConn->rDeliverBufFd, (void*)&rCmppPackage, sizeof(recCmppPck) ); if ( nRet == QUEUE_FULL ) { Trace("RecvThread: delivery flow control error=%d\n", nRet); memcpy( rCmppDeliverResp.uchMsgId, rCmppPackage.body.rCmppDeliver.uchMsgId, MacMsgId+1 ); rCmppDeliverResp.unResult = 8; nRet = nSendDeliverResponse( FD, rCmppPackage.rHead.unSequenceId , &rCmppDeliverResp ); break; } else if ( nRet == QUEUE_FAIL ) { Trace( "RecvThread: Write queue fail=%d\n", nRet); break; } /** Added by YXJ (2003-12-18) ***/ vPrintCellNums( (void *) ((precConn->rDeliverBufFd).pvBufPtr), 1); /*** End by YXJ ****/ if ( precConn->rCfgPara.bAutoDeliveryResponse == True ) { Trace( "RecvThread: bAutoDeliveryResponse is true\n"); memcpy( rCmppDeliverResp.uchMsgId, rCmppPackage.body.rCmppDeliver.uchMsgId, MacMsgId+1 ); rCmppDeliverResp.unResult = 0; nRet = nSendDeliverResponse( FD, rCmppPackage.rHead.unSequenceId , &rCmppDeliverResp ); } break; } case MacC_ActiveRqst: { /* ACTIVETEST 回给RESP 放入到SENDBUFFER */ rCmppPackage.rHead.unCommandId = MacC_ActiveRep; rCmppPackage.rHead.unTotalLength = MacHeadLen + MacActiveTestResLen; nRet = nLockWriteQueue( &precConn->rSendBufFd, (void*)&rCmppPackage, sizeof(recCmppPck) ); if ( nRet != QUEUE_OK ) { /* 不处理写队列错误 */ Trace( "RecvThread: nLockWriteQueue error = %d\n", nRet ); } break; } default : { Trace("RecvThread: invalid massage ID=%d\n",rCmppPackage.rHead.unCommandId); break; } } } return ( NULL );
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?