⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmppif.c

📁 Unix/Linux下的cmpp实现源程序
💻 C
字号:

#include <stdio.h>
#include <pthread.h>

#include "kevopsall.ext"
#include "cmpp.h"
#include "cmppif.h"


CmppArg garg = {PTHREAD_MUTEX_INITIALIZER, 0, 0, NULL, PTHREAD_MUTEX_INITIALIZER, NULL};




static SPCMPP * SPCMPPAlloc ()
{
    SPCMPP * spc = NULL;

    spc = (SPCMPP *)kzalloc(sizeof(*spc));
    if (!spc)
        return NULL;

    return spc;
}


static void SPCMPPFree (SPCMPP * spc)
{
    if (!spc)
        kfree(spc);
}







int CMPPSysInit ()
{
    if (!garg.fpSeq) {
        garg.gseqid = 0;
        garg.fpSeq = fopen("seqid.bin", "rb+");
        if (garg.fpSeq == NULL) {
            garg.fpSeq = fopen("seqid.bin", "wb+");
        } else {
            fseek(garg.fpSeq, 0, SEEK_SET);
            fread(&garg.gseqid, 1, 4, garg.fpSeq);            
            /*if (garg.gseqid != 0)
                garg.gseqid += 1;*/
        }
    }

    if (!garg.spcmpp_list)
        garg.spcmpp_list = lt_new(NULL);

    if (garg.initialized == 0) {
        pthread_mutex_init(&garg.spcmppCS, NULL);

        pthread_mutex_init(&garg.seqidCS, NULL);
        /*garg.gseqid = 0;
        fseek(garg.fpSeq, 0, SEEK_SET);
        fread(&garg.gseqid, 1, 4, garg.fpSeq);*/
    }

    garg.initialized = 1;

    return 0;
}

void CMPPSysCleanup ()
{
    SPCMPP * spc = NULL;
    int i, num;

    EnterCriticalSection(&garg.spcmppCS);
    num = lt_num(garg.spcmpp_list);
    for (i=0; i<num; i++) {
        spc = (SPCMPP *)lt_rm_head(garg.spcmpp_list);
        if (!spc) continue;
        CMPPCleanup(spc);
        SPCMPPFree(spc);
    }
    LeaveCriticalSection(&garg.spcmppCS);

    if (garg.fpSeq) {
        fclose(garg.fpSeq);
        garg.fpSeq = NULL;
    }

    pthread_mutex_destroy(&garg.spcmppCS);
    pthread_mutex_destroy(&garg.seqidCS);

    /*garg.spcmppCS = PTHREAD_MUTEX_INITIALIZER;
    garg.seqidCS = PTHREAD_MUTEX_INITIALIZER;*/

    garg.initialized = 0;
}


unsigned int CMPPGetSeqID ()
{
    static unsigned int tempid;

    if (!garg.fpSeq) 
        CMPPSysInit();

    EnterCriticalSection(&garg.seqidCS);
    tempid = garg.gseqid++;

    fseek(garg.fpSeq, 0, SEEK_SET);
    fwrite(&garg.gseqid, 1, 4, garg.fpSeq);
    LeaveCriticalSection(&garg.seqidCS);

    return tempid;
}







CMPPHandle CMPPInit (unsigned char * ismgaddr,
                     unsigned int    ismgport,
                     unsigned char * spid,
                     unsigned char * secret,
                     int             mtnum,
                     int             monum,
                     unsigned char   mtmajorver,
                     unsigned char   mtminorver,
                     unsigned char   momajorver,
                     unsigned char   mominorver,
                     int             slidingwin,
                     unsigned short  routeid)
{
    int retval = -1;
    SPCMPP * spc = NULL;

    if (!garg.initialized) 
        CMPPSysInit();

    spc = SPCMPPAlloc();
    if (!spc) return NULL;

    if (ismgaddr)
        strncpy(spc->ismgaddr, ismgaddr, sizeof(spc->ismgaddr));
    spc->ismgport = ismgport;
    if (spid)
        strncpy(spc->spid, spid, sizeof(spc->spid));
    if (secret)
        strncpy(spc->secret, secret, sizeof(spc->secret));
    spc->mtnum = mtnum;
    spc->monum = monum;
    spc->mtmajorver = mtmajorver;
    spc->mtminorver = mtminorver;
    spc->momajorver = momajorver;
    spc->mominorver = mominorver;
    spc->slidingwin = slidingwin;

    spc->routeid    = routeid;

    sem_init (&spc->init_end, 0, 0);
    
    retval = pthread_create (&spc->thread_id, NULL, cmppmain, spc);
    if (retval != 0) {
        SPCMPPFree(spc);
        return NULL;
    }

    sem_wait (&spc->init_end);
    sem_destroy (&spc->init_end);

    EnterCriticalSection(&garg.spcmppCS);
    lt_append(garg.spcmpp_list, spc);
    LeaveCriticalSection(&garg.spcmppCS);

    return spc;
}


int CMPPCleanup (CMPPHandle hcmpp)
{
    void * retval;
    SPCMPP * spc = NULL;

    spc = (SPCMPP *)hcmpp;
    if (!spc) return -1;

    spc->cvstate->quit = 1;
    stopIoBlock(spc->cvstate);

    pthread_join (spc->thread_id, &retval);

    EnterCriticalSection(&garg.spcmppCS);
    lt_delete_ptr(garg.spcmpp_list, spc);
    LeaveCriticalSection(&garg.spcmppCS);

    return 0;
}



int CMPPSubmit (
       CMPPHandle    hcmpp,
       unsigned int  seq_id,
       unsigned char msg_level,
       unsigned char *service_id,
       unsigned char *valid_time,
       unsigned char *at_time,
       unsigned char *fee_type,
       unsigned char *fee_code,
       unsigned char feeusertype,
       unsigned char *charge_mobile,
       unsigned char *src_mobile,
       unsigned char user_num,
       unsigned char *dest_mobiles,
       unsigned char msg_format,
       unsigned char msg_len,
       unsigned char *msg,
       unsigned char need_resp)
{
    CmppEntity * cment = NULL;
    MESSAGE * kmsg = NULL;
    CmppPdu * pdu = NULL;
    CmppBlock * block = NULL;
    TOKEN  tk;
    int len = 0;
    SPCMPP * spc = NULL;

    int minus = 0;

    spc = (SPCMPP *)hcmpp;
    if (!spc) return -1;

    cment = (CmppEntity *) spc->entity->var;

    /*while ((minus = cment->total_avail - if_get_submit_num(spc->entity)) <= 0) {*/
    while ((minus = cment->total_avail - cment->consumedwin) <= 0) {
#ifdef _DEBUG
	info("################################CMPPSubmit Minus = %d ####################\n", minus);
#endif
        if ((minus = TimedWaitEvent(cment->hAvailEvent, 5.0)) == -10)
            return -10;
#ifdef _DEBUG
	info("################################CMPPSubmit WaitEvent Return =%d ####################\n", minus);
#endif
    }
    
    pdu = get_recycle_submit_pdu (spc->entity);
    if (!pdu) {
        pdu = cmpdu_alloc (CMPP_SUBMIT);
    }
    cmpdu_init (pdu);

    pdu->sequence_id = seq_id;

    /*pdu->Submit.msg_id */

    pdu->Submit.pk_total = 1;
    pdu->Submit.pk_number = 1;
    pdu->Submit.reg_delivery = need_resp;
    pdu->Submit.msg_level = msg_level;

    len = strlen(service_id); 
    if(len > sizeof(pdu->Submit.service_id)) 
        len = sizeof(pdu->Submit.service_id);
    if (len > 0)
        memcpy(pdu->Submit.service_id, service_id, len);

    pdu->Submit.fee_usertype = feeusertype;

    len = strlen(charge_mobile);
    if (len > sizeof(pdu->Submit.fee_terminal_id))
        len = sizeof(pdu->Submit.fee_terminal_id);
    if (len > 0)
        memcpy(pdu->Submit.fee_terminal_id, charge_mobile, len);

    pdu->Submit.tp_pid = 0;
    pdu->Submit.tp_udhi = 0;
    pdu->Submit.msg_fmt = msg_format;

    /*pdu->Submit.msg_src = SP_ID*/

    len = strlen(fee_type);
    if (len > sizeof(pdu->Submit.feetype))
        len = sizeof(pdu->Submit.feetype);
    if (len > 0)
        memcpy(pdu->Submit.feetype, fee_type, len); 

    len = strlen(fee_code);
    if (len > sizeof(pdu->Submit.feecode))
        len = sizeof(pdu->Submit.feecode);
    if (len > 0)
        memcpy(pdu->Submit.feecode, fee_code, len);

    len = strlen(valid_time);
    if (len > sizeof(pdu->Submit.valid_time))
        len = sizeof(pdu->Submit.valid_time);
    if (len > 0)
        memcpy(pdu->Submit.valid_time, valid_time, len);

    len = strlen(at_time);
    if (len > sizeof(pdu->Submit.at_time))
        len = sizeof(pdu->Submit.at_time);
    if (len > 0)
        memcpy(pdu->Submit.at_time, at_time, len);

    len = strlen(src_mobile);
    if (len > sizeof(pdu->Submit.src_id))
        len = sizeof(pdu->Submit.src_id);
    if (len > 0)
        memcpy(pdu->Submit.src_id, src_mobile, len);

    pdu->Submit.destusr_tl = user_num;
    while (user_num--) {
        len = strlen(dest_mobiles);
        if (len > 21) len = 21;
        putnLastBytes(&pdu->Submit.dest_terminal_id, dest_mobiles, len);
        if (len < 21)
            appendnBytes(&pdu->Submit.dest_terminal_id, '\0', 21-len);
        dest_mobiles += 21;
    }

#ifdef _DEBUG
info("_______CMPPSubmit: dest mobile phone number field %d bytes:\n", 
       frameLength(pdu->Submit.dest_terminal_id));
printFrame(INFO, pdu->Submit.dest_terminal_id, 4);
#endif

    pdu->Submit.msg_length = msg_len;
    putnLastBytes(&pdu->Submit.msg_content, msg, msg_len);


    if_store_submit(spc->entity, pdu);

    block = (CmppBlock *)kzalloc(sizeof(*block));
    if (!block) {
        error("CMPPSubmit: memory allocation for parameter block error.\n");
        return -1;
    }

    tk.selector = UP;
    tk.item = cmpp_submit_req;

    kmsg = createMessage(NULL, spc->entity, tk, 0, 0, block);
    if (!kmsg) {
	kfree(block);
	return -5;
    }

    sendMessage(kmsg, EXTERNAL_PRIOR);
    stopIoBlock(spc->entity->cvopsState);
    return 0;
}


int CMPPSubmitResp (
        CMPPHandle      hcmpp,
        unsigned int  * seq_id,
        unsigned char   msg_id[8],
        unsigned char * result)
{
    CmppEntity * cment = NULL;
    CmppPdu * pdu = NULL;
    SPCMPP * spc = NULL;

    spc = (SPCMPP *)hcmpp;
    if (!spc) return -1;

    if (!seq_id || !result) return -1;

    cment = (CmppEntity *) spc->entity->var;

    while (if_get_submitresp_num(spc->entity) <= 0) {
        if(WaitEvent(cment->hRespEvent) == -10)
            return -10;
    }

    EnterCriticalSection(&cment->respCS);
    pdu = lt_rm_head(cment->submitresps);
    LeaveCriticalSection(&cment->respCS);

    if (!pdu) return -1;

    *seq_id = pdu->sequence_id;
    memcpy(msg_id, pdu->SubmitResp.msg_id, 8);
    *result = pdu->SubmitResp.result;

#ifdef _DEBUG
info("Response  :  Destination_id:\n");
printOctet(INFO, msg_id, 0, 8, 0);
info("Response  :  Destination_id:\n");
printOctet(INFO, pdu->SubmitResp.msg_id, 0, 8, 0);
#endif

    recycle_submitresp_pdu(spc->entity, pdu);

    return 0;
}



int CMPPDeliver (
        CMPPHandle      hcmpp,
        unsigned int  * seq_id,
        unsigned char * msg_id,
        unsigned char * dest_mobile,
        unsigned char * service_id,
        unsigned char * msg_fmt,
        unsigned char * src_mobile,
        unsigned char * status_report,
        unsigned char * msg_len,
        unsigned char * msg)
{
    CmppEntity * cment = NULL;
    CmppPdu * pdu = NULL;
    SPCMPP * spc = NULL;

    spc = (SPCMPP *)hcmpp;
    if (!spc) return -1;

    if (!seq_id || !msg_len) 
        return -1;

    cment = (CmppEntity *) spc->entity->var;

    while (if_get_deliver_num(spc->entity) <= 0) {
        if(WaitEvent(cment->hDeliverEvent) == -10)
            return -10;
    }

    EnterCriticalSection(&cment->deliverCS);
    pdu = lt_rm_head(cment->delivers);
    LeaveCriticalSection(&cment->deliverCS);

    if (!pdu) return -1;

#ifdef _DEBUG
    info("Msg_ID:\n");
    printOctet(INFO, pdu->Deliver.msg_id, 0, 8, 0);
    info("Destination_id:\n");
    printOctet(INFO, pdu->Deliver.dest_id, 0, 21, 0);
    info("Msg_Fmt:\n");
    printOctet(INFO, &pdu->Deliver.msg_fmt, 0, 1, 0);
    info("Source_Terminal_Id:\n");
    printOctet(INFO, pdu->Deliver.src_terminal_id, 0, 21, 0);
    info("Is_Status_Report:\n");
    printOctet(INFO, &pdu->Deliver.reg_delivery, 0, 1, 0);
    info("Msg_length:\n");
    printOctet(INFO, &pdu->Deliver.msg_length, 0, 1, 0);
#endif

    if (seq_id)
        *seq_id = pdu->sequence_id;

    if (msg_id)
        memcpy(msg_id, pdu->Deliver.msg_id, 8);

    if (dest_mobile)
        memcpy(dest_mobile, pdu->Deliver.dest_id, 
                sizeof(pdu->Deliver.dest_id));
    if (service_id)
        memcpy(service_id, pdu->Deliver.service_id, 
                sizeof(pdu->Deliver.service_id));
    if (msg_fmt)
        memcpy(msg_fmt, &pdu->Deliver.msg_fmt, 1);
    if (src_mobile)
        memcpy(src_mobile, pdu->Deliver.src_terminal_id,
                sizeof(pdu->Deliver.src_terminal_id));
    if (status_report)
        memcpy(status_report, &pdu->Deliver.reg_delivery, 1);
    if (msg_len)
        memcpy(msg_len, &pdu->Deliver.msg_length, 1);
    if (msg)
        memcpy(msg, bytePointer(pdu->Deliver.msg_content),
                 pdu->Deliver.msg_length);

    recycle_deliver_pdu(spc->entity, pdu);
    
#ifdef _DEBUG
    info("Msg_ID:\n");
    printOctet(INFO, msg_id, 0, 8, 0);
    info("Msg_ID:\n");
    printOctet(INFO, pdu->Deliver.msg_id, 0, 8, 0);
    info("Destination_id:\n");
    printOctet(INFO, dest_mobile, 0, 21, 0);
    info("Msg_Fmt:\n");
    printOctet(INFO, msg_fmt, 0, 1, 0);
    info("Source_Terminal_Id:\n");
    printOctet(INFO, src_mobile, 0, 21, 0);
    info("Is_Status_Report:\n");
    printOctet(INFO, status_report, 0, 1, 0);
    info("Msg_length:\n");
    printOctet(INFO, msg_len, 0, 1, 0);
#endif

    return 0;
}




unsigned short CMPPGetRoute (CMPPHandle hcmpp)
{
    SPCMPP * spc = NULL;

    if (!hcmpp) return (unsigned short)-1;

    spc = (SPCMPP *)hcmpp;
    return spc->routeid;
}



int CMPPSetRoute (CMPPHandle hcmpp, unsigned short routeid)
{
    SPCMPP * spc = NULL;

    if (!hcmpp) return -1;

    spc = (SPCMPP *)hcmpp;
    spc->routeid =  routeid;

    return 0;
}


struct in_addr CMPPGetRemoteIP (CMPPHandle hcmpp)
{
    SPCMPP * spc = NULL;
    struct in_addr rip;
    CmppEntity * cment = NULL;

    memset(&rip, 0, sizeof(rip));

    if (!hcmpp) return rip;

    spc = (SPCMPP *)hcmpp;

    cment = (CmppEntity *) spc->entity->var;

    return cment->remote_ip;
}


unsigned short CMPPGetRemotePort (CMPPHandle hcmpp)
{
    SPCMPP * spc = NULL;
    CmppEntity * cment = NULL;
    
    if (!hcmpp) return 0;
    
    spc = (SPCMPP *)hcmpp;
    
    cment = (CmppEntity *) spc->entity->var;

    return cment->remote_port;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -