📄 rvmegacotermmgr.c
字号:
/* Call when an event is received or on timeout to see if
a serviceChangeUp is needed */
void rvMegacoTermMgrCheckStartMdm(RvMegacoTermMgr* mgr) {
rvMutexLock(&mgr->startMdmLock);
if (mgr->needStartMdm == rvTrue) {
rvMutexLock(&mgr->lock);
if (mgr->state == RV_MEGACOTERMMGRSTATE_REGISTERED)
{
/* Send an Audit message to verify that we are still registered with the MGC? */
}
else if (mgr->state == RV_MEGACOTERMMGRSTATE_REGISTERING)
{
/* The registration process was begun but not completed */
}
else
{
rvMegacoTermMgrSendServiceChangeUp(mgr,mgr->rootTermination, &mgr->sc1);
}
mgr->needStartMdm = rvFalse;
rvMegacoTermTimerCancel(mgr->mdmTimer);
rvMutexUnlock(&mgr->lock);
}
rvMutexUnlock(&mgr->startMdmLock);
}
static void processStartMdmTimeout(RvTimer* timer, void* data)
{
RvMegacoTermTimer* megacoTimer = (RvMegacoTermTimer*)data;
RvMegacoTermMgr * mgr = (RvMegacoTermMgr*)megacoTimer->data;
rvMegacoTermMgrCheckStartMdm(mgr);
}
/* This function doesn't start mdm yet, instead it stores the service change message and
start a timer. When the timer expires, we will start mdm. If any event arrives before timer
expires, we will start mdm then and stop the timer. */
static void MegacoMgrStart(RvMdmXTermMgr * x,RvMegacoServiceChangeDescriptor * sc, int delay) {
RvMegacoTermMgr * mgr = (RvMegacoTermMgr*)x;
if(sc!=NULL)
rvMegacoServiceChangeDescriptorCopy(&mgr->sc1, sc);
mgr->needStartMdm = rvTrue;
mgr->mdmTimer = rvMegacoTermTimerCreate(mgr->alloc, delay,
processStartMdmTimeout, mgr, &mgr->lock);
}
void rvMegacoTermMgrProcessServiceChangeCommand(RvMegacoTermMgr* termMgr,
const RvMegacoCommand* command,RvMegacoCommandReply* commandReply)
{
const RvMegacoServiceChangeDescriptor* sc;
RvMegacoServiceChangeMethod method;
const RvMegacoEntityAddress* remoteEntityAddr;
RvMegacoTransaction trans;
RvMegacoTransactionReply transReply;
RvBool rval;
sc = rvMegacoServiceChangeCommandGetServiceChange(command);
method = rvMegacoServiceChangeDescriptorGetMethod(sc);
if (method == RV_MEGACOSERVICECHANGEMETHOD_HANDOFF)
{
/*get the entity out of the message */
remoteEntityAddr = rvMegacoServiceChangeDescriptorGetMgc(sc );
if(rvMegacoEntityAddressIsSet(remoteEntityAddr))
{
/* attempt to register with the new mgc*/
rvMutexLock(&termMgr->lock);
termMgr->state = RV_MEGACOTERMMGRSTATE_REGISTERING;
rval = sendServiceChangeToAddress(termMgr,termMgr->rootTermination,remoteEntityAddr,&termMgr->sc,rvFalse,RV_MEGACOSERVICECHANGEMETHOD_HANDOFF);
rvMutexUnlock(&termMgr->lock);
if (rval == rvFalse)
rvMegacoTermMgrSendServiceChangeUp(termMgr,termMgr->rootTermination,&termMgr->sc);
}
}
/* Take down the MG and all the terminations */
/* Delete ephemerals, etc */
else if(method == RV_MEGACOSERVICECHANGEMETHOD_FORCED || method == RV_MEGACOSERVICECHANGEMETHOD_GRACEFUL)
{
RvParseError error;
/* Subtract all the terminations from active contexts by sending the appropiate message */
rvMegacoTransactionConstructParseA(&trans,rvSubtractAllTrans,&error,termMgr->alloc);
rvMegacoTransactionReplyConstructA(&transReply,&trans,termMgr->alloc);
rvMegacoTermMgrProcessAction(termMgr,rvMegacoTransactionGetAction(&trans,0),&transReply);
rvMegacoTransactionDestruct(&trans);
rvMegacoTransactionReplyDestruct(&transReply);
rvMegacoTermDisable(termMgr->rootTermination);
/* Update the state at the end to enable the processing of
the "subtract" command */
termMgr->state = RV_MEGACOTERMMGRSTATE_DISABLED;
}
else if(method == RV_MEGACOSERVICECHANGEMETHOD_RESTART)
{
rvMegacoTermEnable(termMgr->rootTermination);
termMgr->state = RV_MEGACOTERMMGRSTATE_REGISTERED;
}
}
void rvMegacoTermMgrCommFailure(RvMegacoTermMgr * mgr)
{
rvMutexLock(&mgr->lock);
mgr->state = RV_MEGACOTERMMGRSTATE_DISCONNECTED;
rvMegacoTermMgrSendServiceChangeUp(mgr,mgr->rootTermination,&mgr->sc);
rvMutexUnlock(&mgr->lock);
}
void rvMegacoTermMgrProcReply(RvMegacoTcb *tcb, const RvMegacoTransactionReply *reply, void *data, RvTransactionStatus reason)
{
RvMegacoTermMgr * mgr = (RvMegacoTermMgr*)data;
/* If the low level communication failed, we need to go find another MGC */
if ((reason == RVTRANSACTIONSTATUS_SENDREQ_LLFAIL ) || (reason == RVTRANSACTIONSTATUS_SENDREQ_FAIL ))
{
rvMegacoTermMgrCommFailure(mgr);
}
/* Recovery - send SRVC on 2 cases:
1. mgc doesn't know this termination - transaction was received before SRVC
2. mgc received SRVC for termination before it received SRVC for root.
(we probably lost connection temporarily and didn't notice */
if (reply != NULL) {
size_t i, numReplies = rvMegacoTransactionReplyGetNumReplies(reply);
for (i=0 ; i<numReplies ; ++i) {
size_t j, numCommReplies;
const RvMegacoActionReply *actionReply = rvMegacoTransactionReplyGetReply(reply, i);
numCommReplies = rvMegacoActionReplyGetNumReplies(actionReply);
for (j=0 ; j<numCommReplies ; j++) {
const RvMegacoCommandReply *commReply = rvMegacoActionReplyGetReply(actionReply, j);
const RvMegacoErrorDescriptor* error = rvMegacoCommandReplyGetError(commReply);
int errorCode = rvMegacoErrorDescriptorGetCode(error);
if ((errorCode == RV_MEGACOERROR_UNAUTH_CMD))
{
rvMegacoTermMgrReconnect(mgr);/* Only send one service change in the case of an UNAUTH_CMD */
return; /* So, get out of here */
}
}
}
}
}
void procServiceChangeReply(RvMegacoTcb *tcb, const RvMegacoTransactionReply *reply, void *data, RvTransactionStatus reason)
{
size_t numActions,numCommands;
RvMegacoTermMgr * mgr = (RvMegacoTermMgr*)data;
const RvMegacoEntity *remoteEntity = rvMegacoTcbGetRemoteEntity(tcb);
const RvMegacoEntityAddress* remoteEntityAddress;
const RvMegacoEntityAddress* entityFromMsg;
const RvMegacoActionReply* action;
const RvMegacoCommandReply* command;
const RvMegacoServiceChangeDescriptor* sc;
RvBool registered = rvFalse;
rvMutexLock(&mgr->lock);
/* make sure that there is actually a reply before accessing fields...*/
if (reason == RVTRANSACTIONSTATUS_RCV_REPLY )
{
numActions = rvMegacoTransactionReplyGetNumReplies(reply);
if (numActions < 1)
{
goto tryNext;
}
action = rvMegacoTransactionReplyGetReply(reply, 0);
numCommands = rvMegacoActionReplyGetNumReplies(action);
if (numCommands < 1)
{
goto tryNext;
}
command = rvMegacoActionReplyGetReply(action, 0);
if (rvMegacoTermMgrUtilIsCmdRspValid(command))
{ /* not an error */
if (mgr->state == RV_MEGACOTERMMGRSTATE_UNREGISTERING)
{
mgr->state = RV_MEGACOTERMMGRSTATE_DOWN;
goto exit;
}
if (rvMegacoCommandReplyGetType(command) == RV_MEGACOCOMMANDTYPE_SVCCHANGE)
{
sc = rvMegacoCommandReplyGetServiceChange(command);
entityFromMsg = rvMegacoServiceChangeDescriptorGetMgc(sc);
remoteEntityAddress = rvMegacoEntityGetAddress(remoteEntity);
if(rvMegacoEntityAddressIsSet(entityFromMsg) == rvFalse) /*this is blank*/
{
mgr->state = RV_MEGACOTERMMGRSTATE_REGISTERED;
rvMegacoEntityAddressCopy( &mgr->lastConnectedAddress, remoteEntityAddress);
registered= rvTrue;
goto exit;
}
else
{
if(rvMegacoEntityAddressEqual(entityFromMsg,remoteEntityAddress)) /* make sure the addr's match */
{
mgr->state = RV_MEGACOTERMMGRSTATE_REGISTERED;
rvMegacoEntityAddressCopy( &mgr->lastConnectedAddress, entityFromMsg);
registered= rvTrue;
goto exit;
}
else
{
/* Message contains the address of another MGC */
sendServiceChangeToAddress(mgr,mgr->rootTermination,remoteEntityAddress, &mgr->sc,rvFalse,RV_MEGACOSERVICECHANGEMETHOD_HANDOFF);
goto exit;
}
}
}
}
}
/* one possibility is reason == RVTRANSACTIONSTATUS_SENDREQ_LLFAIL) */
/* Try the next one in the list */
tryNext:
if (mgr->state == RV_MEGACOTERMMGRSTATE_REGISTERING)
rvMegacoTermMgrSendServiceChangeUp(mgr,mgr->rootTermination,&mgr->sc);
exit:
rvMutexUnlock(&mgr->lock);
}
static void MegacoMgrStop(RvMdmXTermMgr * x,RvMegacoServiceChangeDescriptor * sc) {
RvMegacoTermMgr * mgr = (RvMegacoTermMgr*)x;
rvMutexLock(&mgr->lock);
/* in case we want to stop mdm before the timer for starting mdm expired */
mgr->needStartMdm = rvFalse;
if (mgr->state == RV_MEGACOTERMMGRSTATE_REGISTERED)
{
sendServiceChangeDown(mgr,mgr->rootTermination,sc,procServiceChangeReply);
mgr->state = RV_MEGACOTERMMGRSTATE_UNREGISTERING;
}
rvMutexUnlock(&mgr->lock);
}
static RvBool MegacoForEachPhysTerm(RvMdmXTermMgr* x,RvMdmProcessEachTermCB func,void* data) {
RvMegacoTermMgr * mgr = (RvMegacoTermMgr*)x;
RvBool retval;
rvMutexLock(&mgr->lock);
retval = rvMegacoContextForEachTerm(&mgr->nullContext,func,data);
rvMutexUnlock(&mgr->lock);
return retval;
}
/* The Callbacks structure */
static RvMdmXTermMgrClbks mdmTermMgrClbks = {
/* RvMdmXTermMgrRegisterPhysTermCB registerPhysTermF; */
MegacoRegisterPhysTerm,
/* RvMdmXTermMgrRegisterEphTermCB registerEphTermF; */
MegacoRegisterEphTerm,
/* RvMdmXTermMgrRegisterRootTermCB registerRootTermF; */
MegacoRegisterRootTerm,
/* RvMdmXTermMgrUnregisterTermCB unregisterTermF; */
MegacoUnregisterTerm,
/* RvMdmXTermMgrClearTermCB clearTermF; */
MegacoClearTerm,
/* RvMdmXTermMgrFindTermCB findTermF; */
MegacoFindTerm,
/* RvMdmXTermMgrFindTermCB getIdleTermF; */
MegacoGetIdleTerm,
/* RvMdmXTermMgrStart startF; */
MegacoMgrStart,
/* RvMdmXTermMgrStop stopF; */
MegacoMgrStop,
/* RvMdmXTermMgrForEachPhysTermCB forEachPhysTermF */
MegacoForEachPhysTerm
};
static size_t hashContextId(const RvUint32* id)
{
return *id;
}
/* Constructor/Destructor */
RvMegacoTermMgr* rvMegacoTermMgrConstructA(RvMegacoTermMgr * mgr, RvMegacoEntity* localEntity, RvAlloc * a)
{
mgr->alloc = a;
rvHashConstruct(RvUint32,RvMegacoContextPtr)(&mgr->contextTable,RV_MEGACOCONTEXTS_BUCKETS,
hashContextId,mgr->alloc,mgr->alloc);
rvMegacoContextConstruct(&mgr->nullContext,0,mgr,mgr->alloc);
rvMegacoContextConstruct(&mgr->ephContext,0,mgr,mgr->alloc);
mgr->rootTermination = rvAllocAllocate(mgr->alloc,sizeof(RvMegacoTerm));
rvMegacoTermConstruct(mgr->rootTermination,"root",&mgr->nullContext,mgr->alloc);
rvListConstruct(RvMegacoEntityAddress)(&mgr->mgcAddrList, mgr->alloc);
rvMegacoEntityAddressConstruct(&mgr->lastConnectedAddress,"", 0 );
mgr->curId = 0;
mgr->physTermAlloc = mgr->alloc;
mgr->ephTermAlloc = mgr->alloc;
mgr->state = RV_MEGACOTERMMGRSTATE_DOWN;
mgr->localEntity = localEntity;
mgr->mgcEntityValid = rvFalse;
/* mgr->msgCounter = 0;*/
mgr->needStartMdm = rvFalse;
mgr->mdmTimer = NULL;
rvMegacoServiceChangeDescriptorConstruct(&mgr->sc);
rvMegacoServiceChangeDescriptorConstruct(&mgr->sc1);
rvMegacoEntityRegisterRecvRequestCB(localEntity,rvMegacoTermMgrProcessTransaction,mgr);
rvQueueConstruct(&mgr->eventQueue,RV_MEGACOTERMMGR_EVQUEUESIZE);
rvThreadConstruct(&mgr->eventThread,"Megaco Event Thread",rvMegacoStackGetPriority(localEntity->stack),
RV_MEGACOTERMMGR_EVSTACKSIZE,rvMegacoTermProcessEventQueue,mgr);
rvThreadStart(&mgr->eventThread);
rvMutexConstruct(&mgr->lock);
rvMutexConstruct(&mgr->startMdmLock);
return mgr;
}
void rvMegacoTermMgrDestruct(RvMegacoTermMgr * mgr) {
RvHashIter(RvUint32,RvMegacoContextPtr) i, nextIter, end;
RvMegacoTerm* dummyTerm = NULL;
RvMegacoContext* context;
rvMutexLock(&mgr->lock);
/* Remove all remaining terminations */
mgr->state = RV_MEGACOTERMMGRSTATE_DOWN;
MegacoClearTerm(mgr,NULL);
/* Destruct all the remaining contexts */
end = rvHashEnd(RvUint32,RvMegacoContextPtr)(&mgr->contextTable);
for(i=rvHashBegin(RvUint32,RvMegacoContextPtr)(&mgr->contextTable);
rvHashIterEqual(RvUint32,RvMegacoContextPtr)(&i,&end)!=rvTrue;
i=nextIter) {
/* Get the next because the context may be remove from the list during processing */
nextIter =rvHashIterNext(RvUint32,RvMegacoContextPtr)(i);
context = *rvHashIterData(RvUint32,RvMegacoContextPtr)(i);
rvMegacoContextDestruct(context);
rvAllocDeallocate(mgr->alloc,sizeof(RvMegacoContext),context);
}
rvHashDestruct(RvUint32,RvMegacoContextPtr)(&mgr->contextTable);
/* Destruct the null,ephemeral context */
rvMegacoContextDestruct(&mgr->nullContext);
rvMegacoContextDestruct(&mgr->ephContext);
rvMegacoTermDestruct(mgr->rootTermination);
rvAllocDeallocate(mgr->alloc,sizeof(RvMegacoTerm),mgr->rootTermination);
rvListDestruct(RvMegacoEntityAddress)(&mgr->mgcAddrList);
rvMegacoServiceChangeDescriptorDestruct(&mgr->sc);
rvMegacoServiceChangeDescriptorDestruct(&mgr->sc1);
if (mgr->mgcEntityValid)
rvMegacoEntityDestruct(&mgr->mgcEntity);
/* Stop the event thread */
rvQueuePush(&mgr->eventQueue,dummyTerm);
/* Wait for it to end */
rvThreadJoin(&mgr->eventThread);
rvThreadDestruct(&mgr->eventThread);
rvQueueDestruct(&mgr->eventQueue);
rvMutexDestruct(&mgr->startMdmLock);
rvMutexUnlock(&mgr->lock);
rvMutexDestruct(&mgr->lock);
}
void rvMegacoTermMgrMgcAddrListAdd(RvMegacoTermMgr * mgr,RvMegacoEntityAddress* mgcAddr)
{
RvBool isFirst;
rvMutexLock(&mgr->lock);
isFirst = rvListEmpty(&mgr->mgcAddrList);
rvListPushBack(RvMegacoEntityAddress)(&mgr->mgcAddrList,mgcAddr);
if (isFirst)
mgr->mgcAddrListIter = rvListBegin(&mgr->mgcAddrList);
rvMutexUnlock(&mgr->lock);
}
void rvMegacoTermMgrMgcAddrListClear(RvMegacoTermMgr * mgr)
{
rvMutexLock(&mgr->lock);
rvListClear(RvMegacoEntityAddress)(&mgr->mgcAddrList);
rvMutexUnlock(&mgr->lock);
}
/* Set the Mdm term mgr, and register itself with the term mgr, register callbacks */
void rvMegacoTermMgrSetMdmMgr(RvMegacoTermMgr * mgr,RvMdmTermMgr * mdmMgr) {
mgr->mdmTermMgr = mdmMgr;
mdmMgr->xTermMgr = mgr;
rvMdmTermMgrRegisterXTermMgrClbks_(mdmMgr,&mdmTermMgrClbks);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -