⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmppdata.c

📁 Unix/Linux下的cmpp实现源程序
💻 C
📖 第 1 页 / 共 2 页
字号:
                startTimer(entity, t_build, TBUILD);
                return get_submit_num(entity);
            }

            item->status = SUBMIT_WAITRESP;
            cmcon->availwin -= 1;

            sendNum++;
            vtsent++;

            item->retrynum++;
            item->submtimer = startDynTimer (vtcon, t_resubmit, 
                                             TRESUBMIT, item);
        }
        if (vtsent > 0) {
            /* start connection test message timer */
            startTimer (vtcon, t_acttest, TACTTEST);
        }
    }

    cment->consumedwin += sendNum;

#ifdef _DEBUG
info("distribute ended.\n");
#endif

    return get_submit_num(entity);
}





int store_submit_resp (VTASK * vtask, CmppPdu * pdu)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->respCS);
    lt_append(cment->submitresps, pdu);
    LeaveCriticalSection (&cment->respCS);

#ifdef _DEBUG
if (lt_num(cment->submitresps) % 500 == 0)    
tolog("submit_resp list total number %d, pid=%ld, threadid=%ld\n", 
		lt_num(cment->submitresps), getpid(), pthread_self());
#endif

    /*sem_post (&cment->sem_submit);*/
    /*sem_post (&cment->sem_submitresp);*/
    SetEvent(cment->hRespEvent, 1);

    return 0;
}


int handle_submit_resp (VTASK * vtask, CmppPdu * pdu)
{
    VTASK         * entity = NULL;
    CmppCon       * cmcon = NULL;
    CmppEntity    * cment = NULL;
    int             i;
    SubmitPending * item = NULL;

    if (!vtask || !pdu) 
        return -1;

#ifdef _DEBUG
info("handle_submit_resp invoked %s for PDU 0x%08x\n", 
     vtask->name, pdu->command_id);
#endif

    cmcon  = (CmppCon *)vtask->var;
    entity = (VTASK *)getEntity(vtask);
    cment  = (CmppEntity *)entity->var;

    for (i=0; i<cmcon->sliding_len; i++) {
        item = &cmcon->subpdu[i];
        if (item->status != SUBMIT_WAITRESP &&
            item->status != SUBMIT_REWAITRESP) 
        {
            continue;
        }
        if (pdu->sequence_id == item->submit->sequence_id) {
            store_submit_resp (vtask, pdu);
            item->status = SUBMIT_AVAIL;
            item->retrynum = 0;

            recycle_submit_pdu (vtask, item->submit);
            item->submit = NULL;
            emptyFrame(item->submitframe);

            stopDynTimer(item->submtimer);
            item->submtimer = NULL;

            cmcon->availwin += 1;
            cment->consumedwin -= 1;

            /* signify the upper layer threads that check if the data buffer
             * is available */
            SetEvent(cment->hAvailEvent, 2);

#ifdef _DEBUG
info("handle_submit_resp: sequence %d is matched, recycle the Submit.\n", pdu->sequence_id);
#endif

            if (get_submit_num(entity) > 0)
                startHook(cment->hDistribute);

            return 0;
        }
    }

    return 0;
}








int store_deliver (VTASK * vtask, CmppPdu * pdu)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->deliverCS);
    lt_append(cment->delivers, pdu);
    LeaveCriticalSection (&cment->deliverCS);

#ifdef _DEBUG
if ( lt_num(cment->delivers) % 500 == 0)    
tolog("store deliver %d, pid=%ld, threadid=%ld\n", 
		lt_num(cment->delivers), getpid(), pthread_self());
#endif

    return 0;
}




int recycle_deliver_pdu (VTASK * vtask, CmppPdu * pdu)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->recycledeliverCS);
    lt_append(cment->recycle_delivers, pdu);
    LeaveCriticalSection (&cment->recycledeliverCS);

#ifdef _DEBUG
if (lt_num(cment->recycle_delivers) % 500 == 0)    
tolog("deliver recycle list total number %d, pid=%ld, threadid=%ld\n", 
		lt_num(cment->recycle_delivers), getpid(), pthread_self());
#endif

    return 0;
}

CmppPdu * get_recycle_deliver_pdu (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    CmppPdu * pdu = NULL;

    if (!vtask) return NULL;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->recycledeliverCS);
    pdu = lt_rm_head (cment->recycle_delivers);
    LeaveCriticalSection (&cment->recycledeliverCS);

#ifdef _DEBUG
if (!pdu) {
    FILE * fp = fopen("kepanic.txt", "a+");
    fprintf(fp, "recycle deliver pdu number is 0, this result in the allocation of one deliver.\n"
	                           "using deliver pdu number is %d.\n",
		                   lt_num(cment->delivers));
    fclose(fp);
}
#endif

    return pdu;
}



int handle_deliver (VTASK * vtask, CmppPdu * pdu)
{
    VTASK      * entity = NULL;
    CmppCon    * cmcon = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu)
        return -1;

    cmcon  = (CmppCon *)vtask->var;
    entity = (VTASK *)getEntity(vtask);
    cment  = (CmppEntity *)entity->var;

    if (!cmcon->deliverresp)
        cmcon->deliverresp = cmpdu_alloc(CMPP_DELIVER_RESP);

    if (!cmcon->deliverresp) return -1;

    cmcon->deliverresp->sequence_id = pdu->sequence_id;
    memcpy (cmcon->deliverresp->DeliverResp.msg_id, pdu->Deliver.msg_id, 
              sizeof(cmcon->deliverresp->DeliverResp.msg_id));
    cmcon->deliverresp->DeliverResp.result = 0;

    cmpdu_encode(cmcon->deliverresp, &cmcon->drespdu);

    if (sendTcpCon (cmcon->fd, cmcon->drespdu) < 0) {
        error("handle_deliver: CMPP connection %s to "
              "ISMG %s:%d crashed while reading length.\n",
              vtask->name, cment->remote_host, cment->remote_port);
        vtask->state = cmpp_null;
        checkIdleConnection(vtask);

        startTimer(entity, t_build, TBUILD);
        return -1;
    }

    store_deliver (vtask, pdu);

    /*sem_post (&cment->sem_deliver);*/
    SetEvent (cment->hDeliverEvent, 0);
    
    return 0;
}



int issue_active_test_resp (VTASK * vtask, CmppPdu * pdu)
{
    VTASK      * entity = NULL;
    CmppCon    * cmcon = NULL;
    CmppEntity * cment = NULL;
    CmppPdu    * respdu = NULL;

    if (!vtask || !pdu)
        return -1;

    cmcon  = (CmppCon *)vtask->var;
    entity = (VTASK *)getEntity(vtask);
    cment  = (CmppEntity *)entity->var;

    respdu = cmpdu_alloc(CMPP_ACTIVE_TEST_RESP);
    if (!respdu) {
        error("cmpdu_alloc failed for ACTIVE_TEST_RESP\n");
        return -1;
    }

    respdu->sequence_id =  pdu->sequence_id;

    cmpdu_encode(respdu, &cmcon->actrespdu);
    cmpdu_free(respdu);

    if (sendTcpCon (cmcon->fd, cmcon->actrespdu) < 0) {
        error("issue_active_test_resp: CMPP connection %s to "
              "ISMG %s:%d crashed while reading length.\n",
              vtask->name, cment->remote_host, cment->remote_port);
        vtask->state = cmpp_null;
        checkIdleConnection(vtask);

        startTimer(entity, t_build, TBUILD);
        return -1;
    }

    return 0;
}


int issue_active_test (VTASK * vtask)
{
    VTASK      * entity = NULL;
    CmppCon    * cmcon = NULL;
    CmppEntity * cment = NULL;

    if (!vtask)
        return -1;

    cmcon  = (CmppCon *)vtask->var;
    entity = (VTASK *)getEntity(vtask);
    cment  = (CmppEntity *)entity->var;

    if (!cmcon->acttest)
        cmcon->acttest = cmpdu_alloc(CMPP_ACTIVE_TEST);

    cmcon->acttest->sequence_id = CMPPGetSeqID();
    cmpdu_encode (cmcon->acttest, &cmcon->actpdu);

    if (sendTcpCon (cmcon->fd, cmcon->actpdu) < 0) {
        error("issue_active_test: CMPP connection %s to "
              "ISMG %s:%d crashed while reading length.\n",
              vtask->name, cment->remote_host, cment->remote_port);
        vtask->state = cmpp_null;
        checkIdleConnection(vtask); 

        startTimer(entity, t_build, TBUILD);
        return -1;
    }

    startTimer (vtask, t_reactive, TREACTIVE);
    ++cmcon->actretry;

    return 0;
}


int handle_active_test_resp (VTASK * vtask, CmppPdu * pdu)
{
    VTASK      * entity = NULL;
    CmppCon    * cmcon = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu)
        return -1;

    cmcon  = (CmppCon *)vtask->var;
    entity = (VTASK *)getEntity(vtask);
    cment  = (CmppEntity *)entity->var;

    if (!cmcon->acttest || cmcon->acttest->sequence_id != pdu->sequence_id)
        return -1;

    stopTimer(vtask, t_reactive);

    cmcon->actretry = 0;
    emptyFrame(cmcon->actpdu);

    return 0;
}




int issue_terminate_resp (VTASK * vtask, CmppPdu * pdu)
{
    VTASK      * entity = NULL;
    CmppCon    * cmcon = NULL;
    CmppEntity * cment = NULL;
    CmppPdu    * respdu = NULL;
    FRAME_PTR    frame = NULL;

    if (!vtask || !pdu)
        return -1;

    cmcon  = (CmppCon *)vtask->var;
    entity = (VTASK *)getEntity(vtask);
    cment  = (CmppEntity *)entity->var;

    respdu = cmpdu_alloc(CMPP_TERMINATE_RESP);
    if (!respdu) return -1;

    respdu->sequence_id =  pdu->sequence_id;

    cmpdu_encode(respdu, &frame);
    cmpdu_free(respdu);

    if (sendTcpCon (cmcon->fd, frame) < 0) {
        error("issue_terminate_resp: CMPP connection %s to "
              "ISMG %s:%d crashed while reading length.\n",
              vtask->name, cment->remote_host, cment->remote_port);
        return -1;
    }

    return 0;
}



⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -