📄 rvmegacostack.c
字号:
{
rvMegacoStackProcessParserError, rvMegacoStackProcessMessageHeader,
rvMegacoStackProcessRequest, rvMegacoStackProcessReply, rvMegacoStackProcessPending,
rvMegacoStackProcessResponseAck, rvMegacoStackProcessErrorTransaction
};
RvMegacoMessageInfo message;
rvMegacoMessageInfoConstruct(&message, localEntity, socket, fromAddr, transportType);
rvMegacoParse(packet, &rvMegacoStackParserHandlers, &message, stack->msgAlloc);
rvMegacoMessageInfoDestruct(&message);
}
}
#ifdef RV_PERFORMANCETEST_ON
#define RV_STATSTHREAD_ON
#endif
#ifdef RV_STATSTHREAD_ON
#include "rvmem.h"
#include "rvtimermgr.h"
extern RvTimerMgr rvTimerMgr;
void printStats(RvThread *thread, void *data)
{
RvMegacoStack *stack = (RvMegacoStack *)data;
RvHrtime startTime, lastTime;
int lastTransactions = 0;
int delay = 30;
printf("%12s %10s %10s %10s %10s %10s %10s\n",
"Transactions", "Seconds", "Rate", "Last", "TCBs", "Timers", "Mem");
startTime = lastTime = rvTimeGetHrtime();
for(;;)
{
RvHrtime curTime;
int numTransactions;
int seconds;
rvThreadSleep(delay++ * 1000);
curTime = rvTimeGetHrtime();
seconds = (int)(rvTimeHrSubtract(curTime, startTime) / (RvHrtime)RV_TIME_NSECPERSEC);
numTransactions = stack->totalTransactions;
printf("%12d %10d %10d %10d %10d %10d %10d\n",
numTransactions,
seconds,
numTransactions / seconds,
(int)((numTransactions - lastTransactions) * (RvHrtime)RV_TIME_NSECPERSEC / rvTimeHrSubtract(curTime, lastTime)),
(int)(stack->curTransactions),
(int)(rvTimerMgrSize(&rvTimerMgr)),
(int)(rvMemTraceGetMemOut()));
lastTime = curTime;
lastTransactions = numTransactions;
}
}
void startStatsThread(RvMegacoStack *stack)
{
RvThread *statsThread = (RvThread *)rvMemAlloc(sizeof(RvThread));
rvThreadConstruct(statsThread, "stats", RV_PRIORITYVALUE_MAX - RV_PRIORITYVALUE_INCREMENT, 4096, printStats, stack);
rvThreadStart(statsThread);
}
#endif
static void* poolAlloc(void *data, size_t sz)
{
RvPool *pool = (RvPool *)data;
if(sz > rvPoolGetBlockSize(pool))
return rvAllocAllocate(&rvDefaultAlloc, sz);
return rvPoolAllocate(pool);
}
static void poolDealloc(void *data, size_t sz, void *ptr)
{
RvPool *pool = (RvPool *)data;
if(sz > rvPoolGetBlockSize(pool))
rvAllocDeallocate(&rvDefaultAlloc, sz, ptr);
else
rvPoolDeallocate(pool, ptr);
}
/* public functions: */
/*$
{function:
{name: rvMegacoStackConstruct}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Constructs a stack object.}
}
{proto: RvMegacoStack *rvMegacoStackConstruct(RvMegacoStack *stack, size_t numThreads, int priority);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:numThreads} {d:The number of threads.}}
{param: {n:priority} {d:The priority.}}
}
{returns: A pointer to the constructed object, or NULL if construction failed.}
}
$*/
RvMegacoStack *rvMegacoStackConstruct(RvMegacoStack *stack, size_t numThreads, int priority)
{
rvPoolConstruct(&stack->tcbPool, sizeof(RvMegacoTcb));
rvPoolConstruct(&stack->sendBufPool, RV_MEGACOTCB_ENCODEBUFINITSIZE);
rvPoolConstruct(&stack->recvBufPool, RV_MEGACOSTACK_RECVBUFSIZE);
rvSerialMemConstruct(&stack->serialMem, 8192);
rvAllocConstruct(&stack->myTcbAlloc, &stack->tcbPool, sizeof(RvMegacoTcb), poolAlloc, poolDealloc);
rvAllocConstruct(&stack->mySendBufAlloc, &stack->sendBufPool, ~0U, poolAlloc, poolDealloc);
rvAllocConstruct(&stack->myRecvBufAlloc, &stack->recvBufPool, ~0U, poolAlloc, poolDealloc);
rvAllocConstruct(&stack->serialMemAlloc, &stack->serialMem, ~0U, rvSerialMemAlloc, rvSerialMemDealloc);
rvMegacoStackConstructA(stack, numThreads, priority,
&rvDefaultAlloc, &stack->mySendBufAlloc, &stack->myRecvBufAlloc,
&stack->serialMemAlloc, &rvDefaultAlloc, &stack->myTcbAlloc);
stack->usingInternalAllocs = rvTrue; /* must do this after rvMegacoStackConstructA */
return stack;
}
/*$
{function:
{name: rvMegacoStackConstructA}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Constructs a stack object.}
}
{proto: RvMegacoStack *rvMegacoStackConstructA(RvMegacoStack *stack, size_t numThreads, int priority,
RvAlloc *genAlloc, RvAlloc *sendBufAlloc, RvAlloc *recvBufAlloc,
RvAlloc *msgAlloc, RvAlloc *entityAlloc, RvAlloc *tcbAlloc);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:numThreads} {d:The number of threads.}}
{param: {n:priority} {d:The priority.}}
{param: {n:genAlloc} {d:Allocator for miscellaneous items.}}
{param: {n:sendBufAlloc} {d:Allocator for buffers used to send messages.}}
{param: {n:recvBufAlloc} {d:Allocator for buffers used to receive messages.}}
{param: {n:msgAlloc} {d:Allocator for message structures.}}
{param: {n:entityAlloc} {d:Allocator for entity structures.}}
{param: {n:tcbAlloc} {d:Allocator for transaction control block structures.}}
}
{returns: A pointer to the constructed object, or NULL if construction failed.}
}
$*/
RvMegacoStack *rvMegacoStackConstructA(RvMegacoStack *stack, size_t numThreads, int priority,
RvAlloc *genAlloc, RvAlloc *sendBufAlloc, RvAlloc *recvBufAlloc,
RvAlloc *msgAlloc, RvAlloc *entityAlloc, RvAlloc *tcbAlloc)
{
const int maxStackPriority = RV_PRIORITYVALUE_MAX - RV_PRIORITYVALUE_INCREMENT;
stack->genAlloc = genAlloc;
stack->sendBufAlloc = sendBufAlloc;
stack->msgAlloc = msgAlloc;
stack->entityAlloc = entityAlloc;
stack->tcbAlloc = tcbAlloc;
#if (RV_PRIORITYVALUE_INCREMENT > 0)
if(priority > maxStackPriority)
#else
if(priority < maxStackPriority)
#endif
priority = maxStackPriority;
rvTransportConstruct(&stack->transport, rvMegacoStackProcessPackets,
numThreads, priority, genAlloc, recvBufAlloc);
rvTransportRegisterSocketDeleteCb(&stack->transport, rvMegacoEntityOnSocketDelete);
stack->encodeCompact = rvTrue;
stack->getKey = NULL;
stack->authenticate = NULL;
stack->authenticationFailures = 0;
stack->rawRecvCb = stack->rawSendCb = NULL;
stack->parseErrorCb = NULL;
stack->parseErrors = 0;
stack->priority = priority;
stack->sendDelay = RV_MEGACOSTACK_SENDDELAY_DEFAULT;
rvRetransTimeoutLimitsConstruct(&stack->timeoutLimits,
RV_MEGACOSTACK_INITRETRANSTIMEOUT_DEFAULT,
RV_MEGACOSTACK_MINRETRANSTIMEOUT_DEFAULT,
RV_MEGACOSTACK_MAXRETRANSTIMEOUT_DEFAULT);
stack->tMax = RV_MEGACOSTACK_TMAX_DEFAULT;
stack->maxPacketSize = RV_MEGACOSTACK_MAXPACKET_DEFAULT;;
rvMutexConstruct(&stack->mutex);
stack->curTransactions = stack->totalTransactions = 0;
stack->usingInternalAllocs = rvFalse;
#ifdef RV_STATSTHREAD_ON
startStatsThread(stack);
#endif
return stack;
}
/*$
{function:
{name: rvMegacoStackDestruct}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Destroys a stack object.}
}
{proto: void rvMegacoStackDestruct(RvMegacoStack *stack);}
{params:
{param: {n:stack} {d:The stack object.}}
}
}
$*/
void rvMegacoStackDestruct(RvMegacoStack *stack)
{
RvBool done = rvFalse;
rvTransportDestruct(&stack->transport);
/* allow tcbs to be deleted before destroying pools */
while(done == rvFalse)
{
rvMutexLock(&stack->mutex);
if ((int)(stack->curTransactions) > 0)
{
rvMutexUnlock(&stack->mutex);
rvThreadSleep(1000);
}
else
{
rvMutexUnlock(&stack->mutex);
done = rvTrue;
}
}
if(stack->usingInternalAllocs)
{
rvPoolDestruct(&stack->tcbPool);
rvPoolDestruct(&stack->sendBufPool);
rvPoolDestruct(&stack->recvBufPool);
rvSerialMemDestruct(&stack->serialMem);
rvAllocDestruct(&stack->myTcbAlloc);
rvAllocDestruct(&stack->mySendBufAlloc);
rvAllocDestruct(&stack->myRecvBufAlloc);
rvAllocDestruct(&stack->serialMemAlloc);
rvMutexDestruct(&stack->mutex);
}
}
/* PRIVATE
{function:
{name: rvMegacoStackStart}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Start processing messages on the stack.}
}
{proto: void rvMegacoStackStart(RvMegacoStack *stack);}
{params:
{param: {n:stack} {d:The stack object.}}
}
}
*/
void rvMegacoStackStart(RvMegacoStack *stack)
{
rvLogInfo3(&rvLog,RV_MEGACOSTACK_VERSION);
rvTransportStart(&stack->transport);
}
/* PRIVATE
{function:
{name: rvMegacoStackStop}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Stop processing messages on the stack.}
}
{proto: void rvMegacoStackStop(RvMegacoStack *stack);}
{params:
{param: {n:stack} {d:The stack object.}}
}
}
*/
void rvMegacoStackStop(RvMegacoStack *stack)
{
rvTransportStop(&stack->transport);
}
/*$
{function:
{name: rvMegacoStackSetCompactEncodingMode}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Sets the encoding style of messages sent by the stack.}
}
{proto: void rvMegacoStackSetCompactEncodingMode(RvMegacoStack *stack, RvBool enable);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:enable} {d:Set to rvTrue to encode messages as compact as possible.
Set to rvFalse to encode messages in a more readable format.}}
}
}
$*/
void rvMegacoStackSetCompactEncodingMode(RvMegacoStack *stack, RvBool enable)
{
stack->encodeCompact = enable;
}
/*$
{function:
{name: rvMegacoStackRegisterSecurityKeyCB}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Registers a callback that will be invoked to determine the security key for the
address on which a message was received.}
}
{proto: void rvMegacoStackRegisterSecurityKeyCB(RvMegacoStack *stack, RvSecurityKeyCB f, void *data);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:f} {d:The callback.}}
{param: {n:data} {d:User data that will be passed to the callback.}}
}
}
$*/
void rvMegacoStackRegisterSecurityKeyCB(RvMegacoStack *stack, RvSecurityKeyCB f, void *data)
{
stack->getKey = f;
stack->getKeyData = data;
}
/*$
{function:
{name: rvMegacoStackRegisterAuthenticationCB}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Registers a hash function that the stack will use for message authentication.}
}
{proto: void rvMegacoStackRegisterAuthenticationCB(RvMegacoStack *stack, RvAuthenticationCB f, size_t hashLength);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:f} {d:The hash function. Normally, this will be either rvHmacMd5 or rvHmacSha1.}}
{param: {n:hashLength} {d:The length of the hash function's output.
Use RV_MD5_HASHLENGTH for rvHmacMd5 and RV_SHA1_HASHLENGTH for rvHmacSha1.}}
}
}
$*/
void rvMegacoStackRegisterAuthenticationCB(RvMegacoStack *stack, RvAuthenticationCB f, size_t hashLength)
{
stack->authenticate = f;
stack->hashLength = hashLength;
}
/*$
{function:
{name: rvMegacoStackRegisterRawRecvCB}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Registers a callback that will be invoked for each message that the stack receives.}
}
{proto: void rvMegacoStackRegisterRawRecvCB(RvMegacoStack *stack, RvMegacoRawCB f, void *data);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:f} {d:The callback.}}
{param: {n:data} {d:User data that will be passed to the callback.}}
}
}
$*/
void rvMegacoStackRegisterRawRecvCB(RvMegacoStack *stack, RvMegacoRawCB f, void *data)
{
stack->rawRecvCb = f;
stack->rawRecvData = data;
}
/*$
{function:
{name: rvMegacoStackRegisterRawSendCB}
{class: RvMegacoStack}
{include: rvmegacostack.h}
{description:
{p: Registers a callback that will be invoked for each message that the stack sends.}
}
{proto: void rvMegacoStackRegisterRawSendCB(RvMegacoStack *stack, RvMegacoRawCB f, void *data);}
{params:
{param: {n:stack} {d:The stack object.}}
{param: {n:f} {d:The callback.}}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -