cmppapi.c

来自「用c/c++实现的一个CMPP API」· C语言 代码 · 共 2,167 行 · 第 1/5 页

C
2,167
字号
        return API_OK;        }/* *重新连接MTBS服务器 */static  void    vReConnect( CONNFD FD ){    int nRet;    recCmppConnect rCmppConnect;    recCmppConnectResp rCmppConnectResp;        memset( &rCmppConnect, 0, sizeof( recCmppConnect ) );    memset( &rCmppConnectResp, 0, sizeof( recCmppConnectResp ) );        g_HConnDesc[FD].eLoginStatus  = ReLogining;    g_HConnDesc[FD].eSocketStatus = ConnectClose;        /*    * close     */    nTcpClose_Api( g_HConnDesc[FD].nConnFd );            nRet = nTcpConnect_Api( g_HConnDesc[FD].rCfgPara.sHostAddr,                         g_HConnDesc[FD].rCfgPara.nHostPort                         );    if( nRet == TCPLIB_FAIL )    {        vSetErrorDetail("Error when ReConnect Server, IP[%s] PORT[%d]\n",                        g_HConnDesc[FD].rCfgPara.sHostAddr,                         g_HConnDesc[FD].rCfgPara.nHostPort                         );        g_HConnDesc[FD].eLoginStatus = ReLoginFail;        return ;             }    else    {        g_HConnDesc[FD].nConnFd = nRet;        g_HConnDesc[FD].eSocketStatus = ConnectOpen;    }            /*    *通过配置得到connect的包内容    */    nGetConnectDataByCfg( &g_HConnDesc[FD].rCfgPara,                          &rCmppConnect  );                              /*    *登陆MTBS服务器    */    nRet = nLogin( FD, &rCmppConnect, &rCmppConnectResp, TRUE );    if( nRet != API_OK  )    {        vSetErrorDetail("Error when Login Server\n");        Trace( "Error when Login Server\n" );        g_HConnDesc[FD].eLoginStatus = ReLoginFail;        return ;            }    if( rCmppConnectResp.unStatus!= 0 )    {        vSetErrorDetail("Connect  failed, Response Status is [%d]\n",                             rCmppConnectResp.unStatus );        Trace( "Connect  failed, Response Status is [%d]\n",                             rCmppConnectResp.unStatus );                                    g_HConnDesc[FD].eLoginStatus = ReLoginFail;        return ;          }        /*    *reconnect ok    */    Trace( "Reconnect OK\n" );    g_HConnDesc[FD].eLoginStatus = Logined;           return ;}/* * 系统内部发送线程 * (1) 接收SENDBUFFER内容 * (2) 发送到SOCKET,如果SOCKET出错误, 重新建立连接 *  (3)  如果超时没有消息包,主动发送ACTIVETEST包 */#define ACTIVE_TIMEVAL  30  // 60秒主动发送ACTIVETEST void*   vSendThread( void* arg ){    CONNFD connDesc;    recCmppPck  rCmppPck;    int nRet;    int nActiveTest;    int nNum = 0;       // 用于统计多少次没有包        memcpy( &connDesc, arg, sizeof( int ) );    while( 1 )    {        if(  g_HConnDesc[ connDesc ].bThreadExit == TRUE )    // 被通知退出        {            Trace( "SendThread: Exit Send Thread for notify\n" );            return NULL;        }        if( g_HConnDesc[connDesc].eLoginStatus == ReLoginRequest )        {             Trace( "SendThread: Send Thread Reconnect\n" );             vReConnect( connDesc );             //modify lijian 2003-12-21 优化重新连接流程             if( g_HConnDesc[connDesc].eSocketStatus != ConnectOpen )             {				Trace("SendThread: socket cannot open\n");            	sleep(1);            	continue;             }        }                nRet = nLockReadQueue( &g_HConnDesc[ connDesc ].rSendBufFd,                           &rCmppPck, sizeof( recCmppPck ), 2 );                if( nRet == QUEUE_FAIL )        {            Trace( "SendThread: ReadQueue Failed\n" );            continue;        }        if( nRet == QUEUE_EMPTY )        {              nNum++ ;              if( nNum < g_HConnDesc[ connDesc ].rCfgPara.nActiveTimeval)              {                //Trace( "SendThread: TimeOut for [%d] times no cell\n", nNum );              }              else              {                 nNum = 0;                 //主动发送ACTIVETEST                 //收到OK的RESP后继续流程                 Trace( "SendThread: TimeOut, Send a Active " );                 nActiveTest = nSendActive( connDesc, TRUE );                 //如果失败发起重新连接                 if( nActiveTest != API_OK && g_HConnDesc[ connDesc ].bThreadExit != TRUE )                  {                                         g_HConnDesc[connDesc].eLoginStatus = ReLoginRequest;                     Trace("SendThread: Send Active Failed , ReLoginRequest\n" );                     continue;                 }              }              usleep(100);              continue;        }                if( nRet == QUEUE_OK )        {            nNum = 0;        }        /*        *发送到SOCKET        */        nRet = nCmppSendSocket( g_HConnDesc[ connDesc ].nConnFd,                                 (void*)&rCmppPck );          //如果失败发起重新连接        if( nRet != API_OK &&             nRet == API_ERR_NETWORK &&            g_HConnDesc[ connDesc ].bThreadExit != TRUE )        {            g_HConnDesc[connDesc].eLoginStatus = ReLoginRequest;            Trace("SendThread: nCmppSendSocket Failed, ReLoginRequest\n" );            continue;        }        if( nRet != API_OK )        {            Trace("Error: nCmppSendSocket  failed, lose one package\n" );        }            } //end of while(1)        return ( NULL );}/* *   从socket接收单个包 *   Para: *       nSock   : socket句柄 *       pBuf    : 接收缓冲,调用者负责分配空间 *       nTimeout: 超时 *   Return: 接收包的大小, <0表示失败 */static int nMSgRecv( int nSock, char* pBuf, int nTimeout ){    int nRet;    int nTotalLen;    int nTmp;    if ( pBuf == NULL )    {        return -1;    }    nRet =  nTcpReadn_Api( nSock, pBuf, MacIntLen, nTimeout );    if( nRet < 0 )    {        Trace ("nTcpRecv Msg error");        return -1;    }    if( nRet == 0 )    {        return 0;    }        memcpy(&nTmp, pBuf, MacIntLen);    nTotalLen = ntohl(nTmp);    if( nTotalLen < MacHeadLen )    {        Trace ("Receive Length not correct len = %d\n", nTotalLen );        return -1;    }        nRet = nTcpReadn_Api(nSock, pBuf+MacIntLen, nTotalLen - MacIntLen , nTimeout);    if(nRet < 0)    {        Trace("nTcpRecv Msg Body error");        return -1;    }    if(nRet == 0)    {        Trace("nTcpRecv Msg Body timeout");        return -1;        }    nRet = nTotalLen;    return nRet;    }#define  RECV_TIMEOUT     60      /* 接收包超时时长 *//* *   系统内部接收线程 *  (1) 阻塞的设置超时(60秒)的接收SOCKET数据。直到接收到一个数据包 *  (2) 调用协议解析函数,得到消息内容  *  (3) 根据不同消息包做对应处理( 目前 有SUBMITRESP,DELIVER和 *           ACTIVETEST, ACTIVETESTRESP) *  (4) ACTIVETEST 回给RESP 放入到SENDBUFFER, 将链路设置为OK *  (5) SUBMITRESP将sequence放入到匹配SEUQNCE的HASH,  *      事件触发用户发送线程, *      (用户发送线程触发后,匹配到SEQUENCE后,取出resp数据, *        验证数据,做处理 ,如果没有匹配到SEQUENCE,再等待事件) */       void*   vRecvThread( void* arg ){    int nRet, nLen;    char RecvBuff[1024];    recCmppPck rCmppPackage;    recCmppDeliverResp rCmppDeliverResp;    recSeqCell rSeqCell;    CONNFD  FD;    recConnDesc *precConn ;        memcpy( &FD, arg, sizeof( int ) );        FD = *( (int*)arg );        precConn = (recConnDesc *)(&g_HConnDesc[ FD ]);    while( 1 )    {        if(  precConn->bThreadExit == TRUE )    // 被通知退出        {            Trace( "RecvThread: Exit Recv Thread for notify\n" );            return NULL;        }        if ( precConn->eSocketStatus != ConnectOpen )        {            Trace( "RecvThread: Socket not Open, sleep 1 sec and try again\n" );            //Trace( "RecvThread: bThreadExit = %d\n", precConn->bThreadExit );            sleep(1);            continue;        }                    memset(RecvBuff, 0, sizeof(RecvBuff));                nRet = nMSgRecv( precConn->nConnFd, RecvBuff, RECV_TIMEOUT );        if ( nRet == 0 )        {            Trace("RecvThread: nRet = 0");            continue;        }        else if ( nRet < 0 && precConn->bThreadExit != TRUE )        {            /*            *错误处理            *触发发送线程重新连接            */            precConn->eLoginStatus = ReLoginRequest;            rCmppPackage.rHead.unCommandId = MacC_ActiveRqst;            rCmppPackage.rHead.unSequenceId = 99999999;            rCmppPackage.rHead.unTotalLength = MacHeadLen + MacActiveTestLen ;            nLockWriteQueue( &precConn->rSendBufFd, &rCmppPackage,                             sizeof( recCmppPck ) );                        Trace("RecvThread: nMsgRecv nRet <0, ReLoginRqeuset And Send A Active");            continue;        }        nLen = nRet;        nRet = nCmppDecode( RecvBuff, nLen, &rCmppPackage );        if ( nRet < 0 )        {            Trace( "RecvThread: nCmppDecode Failed error=%d\n", nRet);            /*错误处理*/            continue;        }                Trace( "RecvThread: Recv a package From outSide, "               "commandId = %x, sequence = %d\n",                 rCmppPackage.rHead.unCommandId,                 rCmppPackage.rHead.unSequenceId );                switch ( rCmppPackage.rHead.unCommandId )        {            case MacC_ActiveRep:            case MacC_SubmitRep:            case MacC_ConnectRep:                            {                //Trace( " bef search Sequence packet in HT, nSeq = %d\n",                //            rCmppPackage.rHead.unSequenceId );                nRet = nLockSerACell( &precConn->rSeqBufFd,                                       (void*)&rCmppPackage.rHead.unSequenceId,                                       sizeof(int),                                                                            (void*) &rSeqCell,                                       sizeof(recSeqCell) );                if ( nRet == SHMHT_OK )                {                      Trace( "RecvThread: success search Sequence packet in HT, nSeq = %d\n",                             rCmppPackage.rHead.unSequenceId );                      memcpy( &rSeqCell.rCmppPck, &rCmppPackage,                                                  sizeof(recCmppPck) );                      nRet = nLockUpdateACell( &precConn->rSeqBufFd,                                   (void*)&rCmppPackage.rHead.unSequenceId,                                   sizeof(int), (void*) &rSeqCell,                                   sizeof(recSeqCell) );                      Trace( "RecvThread: update Sequence packet in HT, nSeq = %d, nRet = %d\n",    	                             rCmppPackage.rHead.unSequenceId, nRet );                      //Trace( " bef set All event, rSeqCell.prEvent = %x, bSended = %d, nSeq=%d",                      //        rSeqCell.prEvent, *rSeqCell.prEvent->pbSended,                       //        rCmppPackage.rHead.unSequenceId );                      nSetEvent( rSeqCell.prEvent);                      //Trace( " aft set All event  ,rSeqCell.prEvent = %x, bSended = %d",                      //        rSeqCell.prEvent, *rSeqCell.prEvent->pbSended );                }                else                {                    Trace( "RecvThread: no search Sequence packet in HT, seq=[%d], nRet = %d\n",							rCmppPackage.rHead.unSequenceId,                            nRet );                }                break;                            }            case MacC_DeliverRqst:            {                             nRet = nLockWriteQueue(  &precConn->rDeliverBufFd,                                         (void*)&rCmppPackage, sizeof(recCmppPck) );                if ( nRet == QUEUE_FULL )                {                                        Trace("RecvThread: delivery flow control error=%d\n", nRet);                        memcpy( rCmppDeliverResp.uchMsgId,                             rCmppPackage.body.rCmppDeliver.uchMsgId, MacMsgId+1 );                       rCmppDeliverResp.unResult = 8;                     nRet = nSendDeliverResponse( FD, rCmppPackage.rHead.unSequenceId ,                                            &rCmppDeliverResp );                    break;                                                     }                else if ( nRet == QUEUE_FAIL )                {                    Trace( "RecvThread: Write queue fail=%d\n", nRet);                    break;                }                                /** Added by YXJ (2003-12-18) ***/               	vPrintCellNums( (void *) ((precConn->rDeliverBufFd).pvBufPtr), 1);                /*** End by YXJ ****/                                if ( precConn->rCfgPara.bAutoDeliveryResponse == True )                {                	Trace( "RecvThread: bAutoDeliveryResponse is true\n");                    memcpy( rCmppDeliverResp.uchMsgId,                             rCmppPackage.body.rCmppDeliver.uchMsgId, MacMsgId+1 );                       rCmppDeliverResp.unResult = 0;                     nRet = nSendDeliverResponse( FD, rCmppPackage.rHead.unSequenceId ,                                            &rCmppDeliverResp );                }                break;                            }            case MacC_ActiveRqst:            {                /*  ACTIVETEST 回给RESP 放入到SENDBUFFER    */                rCmppPackage.rHead.unCommandId   = MacC_ActiveRep;                rCmppPackage.rHead.unTotalLength = MacHeadLen + MacActiveTestResLen;                                nRet = nLockWriteQueue(  &precConn->rSendBufFd,                             (void*)&rCmppPackage, sizeof(recCmppPck) );                if ( nRet != QUEUE_OK )                {                    /* 不处理写队列错误 */                                    Trace( "RecvThread: nLockWriteQueue error = %d\n", nRet );                }                                                break;                            }            default :            {                Trace("RecvThread: invalid massage ID=%d\n",rCmppPackage.rHead.unCommandId);                break;            }        }    }    return ( NULL );

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?