📄 cmppdata.c
字号:
startTimer(entity, t_build, TBUILD);
return get_submit_num(entity);
}
item->status = SUBMIT_WAITRESP;
cmcon->availwin -= 1;
sendNum++;
vtsent++;
item->retrynum++;
item->submtimer = startDynTimer (vtcon, t_resubmit,
TRESUBMIT, item);
}
if (vtsent > 0) {
/* start connection test message timer */
startTimer (vtcon, t_acttest, TACTTEST);
}
}
cment->consumedwin += sendNum;
#ifdef _DEBUG
info("distribute ended.\n");
#endif
return get_submit_num(entity);
}
int store_submit_resp (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->respCS);
lt_append(cment->submitresps, pdu);
LeaveCriticalSection (&cment->respCS);
#ifdef _DEBUG
if (lt_num(cment->submitresps) % 500 == 0)
tolog("submit_resp list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->submitresps), getpid(), pthread_self());
#endif
/*sem_post (&cment->sem_submit);*/
/*sem_post (&cment->sem_submitresp);*/
SetEvent(cment->hRespEvent, 1);
return 0;
}
int handle_submit_resp (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppCon * cmcon = NULL;
CmppEntity * cment = NULL;
int i;
SubmitPending * item = NULL;
if (!vtask || !pdu)
return -1;
#ifdef _DEBUG
info("handle_submit_resp invoked %s for PDU 0x%08x\n",
vtask->name, pdu->command_id);
#endif
cmcon = (CmppCon *)vtask->var;
entity = (VTASK *)getEntity(vtask);
cment = (CmppEntity *)entity->var;
for (i=0; i<cmcon->sliding_len; i++) {
item = &cmcon->subpdu[i];
if (item->status != SUBMIT_WAITRESP &&
item->status != SUBMIT_REWAITRESP)
{
continue;
}
if (pdu->sequence_id == item->submit->sequence_id) {
store_submit_resp (vtask, pdu);
item->status = SUBMIT_AVAIL;
item->retrynum = 0;
recycle_submit_pdu (vtask, item->submit);
item->submit = NULL;
emptyFrame(item->submitframe);
stopDynTimer(item->submtimer);
item->submtimer = NULL;
cmcon->availwin += 1;
cment->consumedwin -= 1;
/* signify the upper layer threads that check if the data buffer
* is available */
SetEvent(cment->hAvailEvent, 2);
#ifdef _DEBUG
info("handle_submit_resp: sequence %d is matched, recycle the Submit.\n", pdu->sequence_id);
#endif
if (get_submit_num(entity) > 0)
startHook(cment->hDistribute);
return 0;
}
}
return 0;
}
int store_deliver (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->deliverCS);
lt_append(cment->delivers, pdu);
LeaveCriticalSection (&cment->deliverCS);
#ifdef _DEBUG
if ( lt_num(cment->delivers) % 500 == 0)
tolog("store deliver %d, pid=%ld, threadid=%ld\n",
lt_num(cment->delivers), getpid(), pthread_self());
#endif
return 0;
}
int recycle_deliver_pdu (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recycledeliverCS);
lt_append(cment->recycle_delivers, pdu);
LeaveCriticalSection (&cment->recycledeliverCS);
#ifdef _DEBUG
if (lt_num(cment->recycle_delivers) % 500 == 0)
tolog("deliver recycle list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->recycle_delivers), getpid(), pthread_self());
#endif
return 0;
}
CmppPdu * get_recycle_deliver_pdu (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
CmppPdu * pdu = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recycledeliverCS);
pdu = lt_rm_head (cment->recycle_delivers);
LeaveCriticalSection (&cment->recycledeliverCS);
#ifdef _DEBUG
if (!pdu) {
FILE * fp = fopen("kepanic.txt", "a+");
fprintf(fp, "recycle deliver pdu number is 0, this result in the allocation of one deliver.\n"
"using deliver pdu number is %d.\n",
lt_num(cment->delivers));
fclose(fp);
}
#endif
return pdu;
}
int handle_deliver (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppCon * cmcon = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu)
return -1;
cmcon = (CmppCon *)vtask->var;
entity = (VTASK *)getEntity(vtask);
cment = (CmppEntity *)entity->var;
if (!cmcon->deliverresp)
cmcon->deliverresp = cmpdu_alloc(CMPP_DELIVER_RESP);
if (!cmcon->deliverresp) return -1;
cmcon->deliverresp->sequence_id = pdu->sequence_id;
memcpy (cmcon->deliverresp->DeliverResp.msg_id, pdu->Deliver.msg_id,
sizeof(cmcon->deliverresp->DeliverResp.msg_id));
cmcon->deliverresp->DeliverResp.result = 0;
cmpdu_encode(cmcon->deliverresp, &cmcon->drespdu);
if (sendTcpCon (cmcon->fd, cmcon->drespdu) < 0) {
error("handle_deliver: CMPP connection %s to "
"ISMG %s:%d crashed while reading length.\n",
vtask->name, cment->remote_host, cment->remote_port);
vtask->state = cmpp_null;
checkIdleConnection(vtask);
startTimer(entity, t_build, TBUILD);
return -1;
}
store_deliver (vtask, pdu);
/*sem_post (&cment->sem_deliver);*/
SetEvent (cment->hDeliverEvent, 0);
return 0;
}
int issue_active_test_resp (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppCon * cmcon = NULL;
CmppEntity * cment = NULL;
CmppPdu * respdu = NULL;
if (!vtask || !pdu)
return -1;
cmcon = (CmppCon *)vtask->var;
entity = (VTASK *)getEntity(vtask);
cment = (CmppEntity *)entity->var;
respdu = cmpdu_alloc(CMPP_ACTIVE_TEST_RESP);
if (!respdu) {
error("cmpdu_alloc failed for ACTIVE_TEST_RESP\n");
return -1;
}
respdu->sequence_id = pdu->sequence_id;
cmpdu_encode(respdu, &cmcon->actrespdu);
cmpdu_free(respdu);
if (sendTcpCon (cmcon->fd, cmcon->actrespdu) < 0) {
error("issue_active_test_resp: CMPP connection %s to "
"ISMG %s:%d crashed while reading length.\n",
vtask->name, cment->remote_host, cment->remote_port);
vtask->state = cmpp_null;
checkIdleConnection(vtask);
startTimer(entity, t_build, TBUILD);
return -1;
}
return 0;
}
int issue_active_test (VTASK * vtask)
{
VTASK * entity = NULL;
CmppCon * cmcon = NULL;
CmppEntity * cment = NULL;
if (!vtask)
return -1;
cmcon = (CmppCon *)vtask->var;
entity = (VTASK *)getEntity(vtask);
cment = (CmppEntity *)entity->var;
if (!cmcon->acttest)
cmcon->acttest = cmpdu_alloc(CMPP_ACTIVE_TEST);
cmcon->acttest->sequence_id = CMPPGetSeqID();
cmpdu_encode (cmcon->acttest, &cmcon->actpdu);
if (sendTcpCon (cmcon->fd, cmcon->actpdu) < 0) {
error("issue_active_test: CMPP connection %s to "
"ISMG %s:%d crashed while reading length.\n",
vtask->name, cment->remote_host, cment->remote_port);
vtask->state = cmpp_null;
checkIdleConnection(vtask);
startTimer(entity, t_build, TBUILD);
return -1;
}
startTimer (vtask, t_reactive, TREACTIVE);
++cmcon->actretry;
return 0;
}
int handle_active_test_resp (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppCon * cmcon = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu)
return -1;
cmcon = (CmppCon *)vtask->var;
entity = (VTASK *)getEntity(vtask);
cment = (CmppEntity *)entity->var;
if (!cmcon->acttest || cmcon->acttest->sequence_id != pdu->sequence_id)
return -1;
stopTimer(vtask, t_reactive);
cmcon->actretry = 0;
emptyFrame(cmcon->actpdu);
return 0;
}
int issue_terminate_resp (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppCon * cmcon = NULL;
CmppEntity * cment = NULL;
CmppPdu * respdu = NULL;
FRAME_PTR frame = NULL;
if (!vtask || !pdu)
return -1;
cmcon = (CmppCon *)vtask->var;
entity = (VTASK *)getEntity(vtask);
cment = (CmppEntity *)entity->var;
respdu = cmpdu_alloc(CMPP_TERMINATE_RESP);
if (!respdu) return -1;
respdu->sequence_id = pdu->sequence_id;
cmpdu_encode(respdu, &frame);
cmpdu_free(respdu);
if (sendTcpCon (cmcon->fd, frame) < 0) {
error("issue_terminate_resp: CMPP connection %s to "
"ISMG %s:%d crashed while reading length.\n",
vtask->name, cment->remote_host, cment->remote_port);
return -1;
}
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -