⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmppdata.c

📁 Unix/Linux下的cmpp实现源程序
💻 C
📖 第 1 页 / 共 2 页
字号:

/*******************************************************
    NAME:      cmppdata.c
    PURPOSE:   China Mobile Peer to Peer Protocol 2.0
               Permanent connection implementation to
               submit data to Internet Short Message Gateway
               or deliver data from ISMG to upper layer.

               data management, including storing into and fetching
               from queue, implementation.

    VERSION:   0.0.1
    AUTHOR:    Ke Heng Zhong
    DATE:      2002/06/17 12:52
    MODIFIED:  2000/06/17 21:12
 ********************************************************/

#include "kevopsall.ext"
#include "cmpp.h"
#include "cmppif.h"



int if_get_submit_num (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    int num = 0;

    if (!vtask) return 0;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->submitCS);
    num += lt_num(cment->submits);
    LeaveCriticalSection (&cment->submitCS);

    return num;
}



int if_get_submitresp_num (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    int num = 0;

    if (!vtask) return 0;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->respCS);
    num += lt_num(cment->submitresps);
    LeaveCriticalSection (&cment->respCS);

    return num;
}



int if_get_deliver_num (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    int num = 0;

    if (!vtask) return 0;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->deliverCS);
    num += lt_num(cment->delivers);
    LeaveCriticalSection (&cment->deliverCS);

    return num;
}



int get_submit_num (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    int num = 0;

    if (!vtask) return 0;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    num = lt_num(cment->unsucc_submits);

    EnterCriticalSection (&cment->submitCS);
    num += lt_num(cment->submits);
    LeaveCriticalSection (&cment->submitCS);

    return num;
}


CmppPdu * fetch_submit (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppPdu * submit = NULL;
    CmppEntity * cment = NULL;

    if (!vtask) return NULL;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    if (lt_num(cment->unsucc_submits)) {
        submit = (CmppPdu *)lt_rm_head(cment->unsucc_submits);
        if (submit)
            return submit;
    }

    EnterCriticalSection (&cment->submitCS);
    submit = (CmppPdu *)lt_rm_head(cment->submits);
    LeaveCriticalSection (&cment->submitCS);

    return submit;
}


int if_store_submit (VTASK * vtask, CmppPdu * submit)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !submit) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->submitCS);
    lt_append(cment->submits, submit);
    LeaveCriticalSection (&cment->submitCS);

#ifdef _DEBUG
if (lt_num(cment->submits) % 500 == 0)
tolog("if_store_submits, total submits %d, pid=%ld, threadid=%ld.\n", 
		lt_num(cment->submits), getpid(), pthread_self());
#endif

    return 0;
}



int store_unsucc_submit (VTASK * vtask, CmppPdu * submit)
{
    VTASK      * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !submit) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    lt_append(cment->unsucc_submits, submit);

#ifdef _DEBUG
if (lt_num(cment->unsucc_submits) % 500 == 0)
tolog("submit unsucc list total number %d, pid=%ld, threadid=%ld\n", 
		lt_num(cment->unsucc_submits), getpid(), pthread_self());
#endif

    return 0;
}




int recycle_submit_pdu (VTASK * vtask, CmppPdu * pdu)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->recycleCS);
    lt_append(cment->recycle_submits, pdu);
    LeaveCriticalSection (&cment->recycleCS);
  
#ifdef _DEBUG
if (lt_num(cment->recycle_submits) % 500 == 0)
tolog("submit recycle list total number %d, pid=%ld, threadid=%ld\n", 
		lt_num(cment->recycle_submits), getpid(), pthread_self());
#endif

    return 0;
}

CmppPdu * get_recycle_submit_pdu (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    CmppPdu * pdu = NULL;
 
    if (!vtask) return NULL;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->recycleCS);
    pdu = lt_rm_head (cment->recycle_submits);
    LeaveCriticalSection (&cment->recycleCS);

#ifdef _DEBUG
if (!pdu) {
    FILE * fp = fopen("kepanic.txt", "a+");
    fprintf(fp, "recycle submit pdu number is 0, this result in the allocation of one submit.\n"
		    "using submit pdu number is %d.\n",
		    lt_num(cment->submits));
    fclose(fp);
}	
#endif

    return pdu;
}



int recycle_submitresp_pdu (VTASK * vtask, CmppPdu * pdu)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;

    if (!vtask || !pdu) return -1;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->recyclerespCS);
    lt_append(cment->recycle_submitresps, pdu);
    LeaveCriticalSection (&cment->recyclerespCS);

#ifdef _DEBUG
if (lt_num(cment->recycle_submitresps) % 500 == 0)
tolog("submit_response recycle list total number %d, pid=%ld, threadid=%ld\n", 
		lt_num(cment->recycle_submitresps), getpid(), pthread_self());
#endif

    return 0;
}

CmppPdu * get_recycle_submitresp_pdu (VTASK * vtask)
{
    VTASK * entity = NULL;
    CmppEntity * cment = NULL;
    CmppPdu * pdu = NULL;

    if (!vtask) return NULL;

    entity = (VTASK *) getEntity (vtask);
    cment = (CmppEntity *)entity->var;

    EnterCriticalSection (&cment->recyclerespCS);
    pdu = lt_rm_head (cment->recycle_submitresps);
    LeaveCriticalSection (&cment->recyclerespCS);

#ifdef _DEBUG
if (!pdu) {
    FILE * fp = fopen("kepanic.txt", "a+");
    fprintf(fp, "recycle submitresp pdu number is 0, this result in the allocation of one submitresp.\n"
                       "using submitresp pdu number is %d.\n",
                        lt_num(cment->submitresps));
    fclose(fp);
}       
#endif

    return pdu;
}


int distribute_submit (VTASK * vtask, void * userinfo)
{
    VTASK         * entity = NULL,
                  * vtcon = NULL;
    ENTITY        * mentity = NULL;
    CmppEntity    * cment = NULL;
    CmppCon       * cmcon = NULL;
    int             i, num, j, k;
    SubmitPending * item = NULL;
    CmppPdu       * cmpdu = NULL;
    int             sendNum = 0, vtsent = 0;

    struct tm     * curtime;
    time_t          calendt;
    uint32          ufmt = 0;

    if (!vtask) return 0;

#ifdef _DEBUG
info("distribute_submit: vtask %s begin.\n", vtask->name);
#endif

    mentity = getEntity (vtask);
    entity = (VTASK *)mentity;
    cment = (CmppEntity *)entity->var;

    if (cment->total_avail - cment->consumedwin <= 0)
        return 0;

    if (get_submit_num(entity) == 0)
        return 0;

    num = sk_num (mentity->busyConnections);
    for (i=0; i<num; i++) {
        vtcon = (VTASK *)sk_value(mentity->busyConnections, i);

        if (vtcon->state != cmpp_ready)
            continue;

        cmcon = (CmppCon *)vtcon->var;
#ifdef _DEBUG
info("distribute via %s, availwin=%d, is_mt=%s, totalavail=%d consumedwin=%d\n",
	       	vtcon->name, cmcon->availwin, cmcon->is_mo?"no":"yes", 
		cment->total_avail, cment->consumedwin);
#endif

        if (cmcon->availwin <= 0) 
            continue;

        if (cmcon->is_mo)
            continue;

        for (j=0, vtsent=0; j<cmcon->sliding_len; j++) {
            item = &cmcon->subpdu[j];
            if (item->status != SUBMIT_AVAIL)
                continue;

            item->submit = cmpdu = fetch_submit (vtask);
            if (!cmpdu) {
                /*cment->total_avail -= sendNum;*/
                cment->consumedwin += sendNum;
                return 0;
            }

            /* some default parameters not filled by upper layer, 
             * these paras are fetched from configuration file. 
             * now fill them here. */
            memcpy(cmpdu->Submit.msg_src, cmcon->sp_id, 
                       sizeof(cmpdu->Submit.msg_src));

            time(&calendt);
            curtime = localtime(&calendt);

            /* construct time format of MSG_ID */
            ufmt = 0;
            ufmt = ufmt | ((curtime->tm_sec & 0x3f) << 7);
            ufmt = ufmt | ((curtime->tm_min & 0x1f) << 13);
            ufmt = ufmt | ((curtime->tm_hour & 0x1f) << 18);
            ufmt = ufmt | ((curtime->tm_mday & 0x1f) << 23);
            ufmt = ufmt | ((curtime->tm_mon & 0x0f) << 28);
            for (k=3; k>=0; k--) {
                cmpdu->Submit.msg_id[k] = (uint8)(ufmt & 0xff);
                ufmt = ufmt >> 8;
            }
            /*memcpy(&cmpdu->Submit.msg_id[0], (uint8 *)&ufmt, 4);*/

            ufmt = cmpdu->sequence_id & 0x0000ffff;
            for (k=7; k>=4; k--) {
                cmpdu->Submit.msg_id[k] = (uint8)(ufmt & 0xff);
                ufmt = ufmt >> 8;
            }
            /*memcpy(&cmpdu->Submit.msg_id[4], (uint8 *)&ufmt, 4);*/

            emptyFrame(item->submitframe);
            cmpdu_encode(cmpdu, &item->submitframe);
            item->retrynum = 0;
            item->status = SUBMIT_PRESEND;
            
#ifdef _DEBUG
info("distribute procedure: %d bytes submit data to be sent:\n", 
frameLength(item->submitframe));
/*printFrame(INFO, item->submitframe, 3);*/
#endif

            if (sendTcpCon (cmcon->fd, item->submitframe) < 0) {
                error("distribute_submit: CMPP connection %s to "
                      "ISMG %s:%d crashed while reading length.\n",
                      vtcon->name, cment->remote_host, cment->remote_port);
                vtcon->state = cmpp_null;
                checkIdleConnection(vtcon);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -