📄 cmppdata.c
字号:
/*******************************************************
NAME: cmppdata.c
PURPOSE: China Mobile Peer to Peer Protocol 2.0
Permanent connection implementation to
submit data to Internet Short Message Gateway
or deliver data from ISMG to upper layer.
data management, including storing into and fetching
from queue, implementation.
VERSION: 0.0.1
AUTHOR: Ke Heng Zhong
DATE: 2002/06/17 12:52
MODIFIED: 2000/06/17 21:12
********************************************************/
#include "kevopsall.ext"
#include "cmpp.h"
#include "cmppif.h"
int if_get_submit_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->submitCS);
num += lt_num(cment->submits);
LeaveCriticalSection (&cment->submitCS);
return num;
}
int if_get_submitresp_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->respCS);
num += lt_num(cment->submitresps);
LeaveCriticalSection (&cment->respCS);
return num;
}
int if_get_deliver_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->deliverCS);
num += lt_num(cment->delivers);
LeaveCriticalSection (&cment->deliverCS);
return num;
}
int get_submit_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
num = lt_num(cment->unsucc_submits);
EnterCriticalSection (&cment->submitCS);
num += lt_num(cment->submits);
LeaveCriticalSection (&cment->submitCS);
return num;
}
CmppPdu * fetch_submit (VTASK * vtask)
{
VTASK * entity = NULL;
CmppPdu * submit = NULL;
CmppEntity * cment = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
if (lt_num(cment->unsucc_submits)) {
submit = (CmppPdu *)lt_rm_head(cment->unsucc_submits);
if (submit)
return submit;
}
EnterCriticalSection (&cment->submitCS);
submit = (CmppPdu *)lt_rm_head(cment->submits);
LeaveCriticalSection (&cment->submitCS);
return submit;
}
int if_store_submit (VTASK * vtask, CmppPdu * submit)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !submit) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->submitCS);
lt_append(cment->submits, submit);
LeaveCriticalSection (&cment->submitCS);
#ifdef _DEBUG
if (lt_num(cment->submits) % 500 == 0)
tolog("if_store_submits, total submits %d, pid=%ld, threadid=%ld.\n",
lt_num(cment->submits), getpid(), pthread_self());
#endif
return 0;
}
int store_unsucc_submit (VTASK * vtask, CmppPdu * submit)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !submit) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
lt_append(cment->unsucc_submits, submit);
#ifdef _DEBUG
if (lt_num(cment->unsucc_submits) % 500 == 0)
tolog("submit unsucc list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->unsucc_submits), getpid(), pthread_self());
#endif
return 0;
}
int recycle_submit_pdu (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recycleCS);
lt_append(cment->recycle_submits, pdu);
LeaveCriticalSection (&cment->recycleCS);
#ifdef _DEBUG
if (lt_num(cment->recycle_submits) % 500 == 0)
tolog("submit recycle list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->recycle_submits), getpid(), pthread_self());
#endif
return 0;
}
CmppPdu * get_recycle_submit_pdu (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
CmppPdu * pdu = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recycleCS);
pdu = lt_rm_head (cment->recycle_submits);
LeaveCriticalSection (&cment->recycleCS);
#ifdef _DEBUG
if (!pdu) {
FILE * fp = fopen("kepanic.txt", "a+");
fprintf(fp, "recycle submit pdu number is 0, this result in the allocation of one submit.\n"
"using submit pdu number is %d.\n",
lt_num(cment->submits));
fclose(fp);
}
#endif
return pdu;
}
int recycle_submitresp_pdu (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recyclerespCS);
lt_append(cment->recycle_submitresps, pdu);
LeaveCriticalSection (&cment->recyclerespCS);
#ifdef _DEBUG
if (lt_num(cment->recycle_submitresps) % 500 == 0)
tolog("submit_response recycle list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->recycle_submitresps), getpid(), pthread_self());
#endif
return 0;
}
CmppPdu * get_recycle_submitresp_pdu (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
CmppPdu * pdu = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recyclerespCS);
pdu = lt_rm_head (cment->recycle_submitresps);
LeaveCriticalSection (&cment->recyclerespCS);
#ifdef _DEBUG
if (!pdu) {
FILE * fp = fopen("kepanic.txt", "a+");
fprintf(fp, "recycle submitresp pdu number is 0, this result in the allocation of one submitresp.\n"
"using submitresp pdu number is %d.\n",
lt_num(cment->submitresps));
fclose(fp);
}
#endif
return pdu;
}
int distribute_submit (VTASK * vtask, void * userinfo)
{
VTASK * entity = NULL,
* vtcon = NULL;
ENTITY * mentity = NULL;
CmppEntity * cment = NULL;
CmppCon * cmcon = NULL;
int i, num, j, k;
SubmitPending * item = NULL;
CmppPdu * cmpdu = NULL;
int sendNum = 0, vtsent = 0;
struct tm * curtime;
time_t calendt;
uint32 ufmt = 0;
if (!vtask) return 0;
#ifdef _DEBUG
info("distribute_submit: vtask %s begin.\n", vtask->name);
#endif
mentity = getEntity (vtask);
entity = (VTASK *)mentity;
cment = (CmppEntity *)entity->var;
if (cment->total_avail - cment->consumedwin <= 0)
return 0;
if (get_submit_num(entity) == 0)
return 0;
num = sk_num (mentity->busyConnections);
for (i=0; i<num; i++) {
vtcon = (VTASK *)sk_value(mentity->busyConnections, i);
if (vtcon->state != cmpp_ready)
continue;
cmcon = (CmppCon *)vtcon->var;
#ifdef _DEBUG
info("distribute via %s, availwin=%d, is_mt=%s, totalavail=%d consumedwin=%d\n",
vtcon->name, cmcon->availwin, cmcon->is_mo?"no":"yes",
cment->total_avail, cment->consumedwin);
#endif
if (cmcon->availwin <= 0)
continue;
if (cmcon->is_mo)
continue;
for (j=0, vtsent=0; j<cmcon->sliding_len; j++) {
item = &cmcon->subpdu[j];
if (item->status != SUBMIT_AVAIL)
continue;
item->submit = cmpdu = fetch_submit (vtask);
if (!cmpdu) {
/*cment->total_avail -= sendNum;*/
cment->consumedwin += sendNum;
return 0;
}
/* some default parameters not filled by upper layer,
* these paras are fetched from configuration file.
* now fill them here. */
memcpy(cmpdu->Submit.msg_src, cmcon->sp_id,
sizeof(cmpdu->Submit.msg_src));
time(&calendt);
curtime = localtime(&calendt);
/* construct time format of MSG_ID */
ufmt = 0;
ufmt = ufmt | ((curtime->tm_sec & 0x3f) << 7);
ufmt = ufmt | ((curtime->tm_min & 0x1f) << 13);
ufmt = ufmt | ((curtime->tm_hour & 0x1f) << 18);
ufmt = ufmt | ((curtime->tm_mday & 0x1f) << 23);
ufmt = ufmt | ((curtime->tm_mon & 0x0f) << 28);
for (k=3; k>=0; k--) {
cmpdu->Submit.msg_id[k] = (uint8)(ufmt & 0xff);
ufmt = ufmt >> 8;
}
/*memcpy(&cmpdu->Submit.msg_id[0], (uint8 *)&ufmt, 4);*/
ufmt = cmpdu->sequence_id & 0x0000ffff;
for (k=7; k>=4; k--) {
cmpdu->Submit.msg_id[k] = (uint8)(ufmt & 0xff);
ufmt = ufmt >> 8;
}
/*memcpy(&cmpdu->Submit.msg_id[4], (uint8 *)&ufmt, 4);*/
emptyFrame(item->submitframe);
cmpdu_encode(cmpdu, &item->submitframe);
item->retrynum = 0;
item->status = SUBMIT_PRESEND;
#ifdef _DEBUG
info("distribute procedure: %d bytes submit data to be sent:\n",
frameLength(item->submitframe));
/*printFrame(INFO, item->submitframe, 3);*/
#endif
if (sendTcpCon (cmcon->fd, item->submitframe) < 0) {
error("distribute_submit: CMPP connection %s to "
"ISMG %s:%d crashed while reading length.\n",
vtcon->name, cment->remote_host, cment->remote_port);
vtcon->state = cmpp_null;
checkIdleConnection(vtcon);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -