📄 rvmegacoentity.c
字号:
/******************************************************************************
Filename: rvmegacoentity.c
Description: megaco entity class
*******************************************************************************
Copyright (c) 2000 RADVision Inc.
*******************************************************************************
NOTICE:
This document contains information that is proprietary to RADVision Inc.
No part of this publication may be reproduced in any form whatsoever without
written prior approval by RADVision Inc.
RADVision Inc. reserves the right to revise this publication and make changes
without obligation to notify any person of such revisions or changes.
******************************************************************************/
#include "rvmegacoentity.h"
#include "rvmegacoencode.h"
#include "rvlog.h"
#include "rvtpkt.h"
#include <stdlib.h>
#define RV_MEGACO_DEFAULTMGCTEXTPORT 2944
#define RV_MEGACO_DEFAULTMGCBINARYPORT 2945
#define RV_MEGACOENTITY_REMOTEBUCKETS 50
#define RV_MEGACOENTITY_TCBBUCKETS 50
#define rvMegacoEntityGetSendTimer(x) (&(x)->u.remote.sendTimer)
#define RvMegacoEntityAddressConstructCopy rvMegacoEntityAddressConstructCopy
#define RvMegacoEntityAddressDestruct rvMegacoEntityAddressDestruct
#define RvMegacoEntityAddressEqual rvMegacoEntityAddressEqual
#define RvMegacoEntityAddressGetAllocator(x) rvStringGetAllocator(&(x)->address)
#define RvMegacoEntityPtrConstructCopy rvDefaultConstructCopy
#define RvMegacoEntityPtrDestruct rvDefaultDestruct
#define RvMegacoEntityPtrGetAllocator rvDefaultGetAllocator
#define RvMegacoTransactionIdConstructCopy rvDefaultConstructCopy
#define RvMegacoTransactionIdDestruct rvDefaultDestruct
#define RvMegacoTransactionIdEqual rvDefaultEqual
#define RvMegacoTransactionIdGetAllocator rvDefaultGetAllocator
#define RvMegacoTcbPtrConstructCopy rvDefaultConstructCopy
#define RvMegacoTcbPtrDestruct rvDefaultDestruct
#define RvMegacoTcbPtrGetAllocator rvDefaultGetAllocator
rvDefineList(RvMegacoEntityAddress)
rvDefineHash(RvMegacoEntityAddress, RvMegacoEntityPtr)
rvDefineHash(RvMegacoTransactionId, RvMegacoTcbPtr)
static size_t hashEntityAddress(const RvMegacoEntityAddress *key)
{
const unsigned char *s = (const unsigned char *)rvMegacoEntityAddressGetAddress(key);
size_t sum = 0;
while(*s)
sum ^= *s++;
return sum;
}
static size_t hashTransactionId(const RvMegacoTransactionId *id)
{
return *id;
}
static void rvMegacoEntityDeleteAllRemotes(RvMegacoEntity *entity)
{
RvHashIter(RvMegacoEntityAddress, RvMegacoEntityPtr) iter;
RvHashIter(RvMegacoEntityAddress, RvMegacoEntityPtr) end =
rvHashEnd(RvMegacoEntityAddress, RvMegacoEntityPtr)(&entity->u.local.remotes);
for(iter = rvHashBegin(RvMegacoEntityAddress, RvMegacoEntityPtr)(&entity->u.local.remotes);
!rvHashIterEqual(RvMegacoEntityAddress, RvMegacoEntityPtr)(&iter, &end);
iter = rvHashIterNext(RvMegacoEntityAddress, RvMegacoEntityPtr)(iter))
{
RvMegacoEntity *remote = *rvHashIterData(RvMegacoEntityAddress, RvMegacoEntityPtr)(iter);
if(remote->u.remote.deleteMe)
{
rvMegacoEntityDestruct(remote);
rvAllocDeallocate(entity->stack->entityAlloc, sizeof(RvMegacoEntity), remote);
}
else
{
remote->u.remote.localEntity = NULL;
}
}
}
static void rvMegacoEntityDeleteAllTcbs(RvMegacoEntity *entity)
{
RvHashIter(RvMegacoTransactionId, RvMegacoTcbPtr) iter;
RvHashIter(RvMegacoTransactionId, RvMegacoTcbPtr) end = rvHashEnd(RvMegacoTransactionId, RvMegacoTcbPtr)(&entity->tcbs);
for(iter = rvHashBegin(RvMegacoTransactionId, RvMegacoTcbPtr)(&entity->tcbs);
!rvHashIterEqual(RvMegacoTransactionId, RvMegacoTcbPtr)(&iter, &end);
iter = rvHashIterNext(RvMegacoTransactionId, RvMegacoTcbPtr)(iter))
{
RvMegacoTcb *tcb = *rvHashIterData(RvMegacoTransactionId, RvMegacoTcbPtr)(iter);
rvMegacoTcbMarkForDestruct(tcb, RVTRANSACTIONSTATUS_ENTITY_DESTRUCT);
}
if(!entity->isLocal)
{
RvMegacoEntity *local = entity->u.remote.localEntity;
iter = rvHashBegin(RvMegacoTransactionId, RvMegacoTcbPtr)(&local->tcbs);
end = rvHashEnd(RvMegacoTransactionId, RvMegacoTcbPtr)(&local->tcbs);
while(!rvHashIterEqual(RvMegacoTransactionId, RvMegacoTcbPtr)(&iter, &end))
{
RvMegacoTcb *tcb = *rvHashIterData(RvMegacoTransactionId, RvMegacoTcbPtr)(iter);
if(rvMegacoTcbGetRemoteEntity(tcb) == entity)
{
rvMegacoTcbMarkForDestruct(tcb, RVTRANSACTIONSTATUS_ENTITY_DESTRUCT);
iter = rvHashErase(RvMegacoTransactionId, RvMegacoTcbPtr)(&local->tcbs, iter);
}
else
iter = rvHashIterNext(RvMegacoTransactionId, RvMegacoTcbPtr)(iter);
}
}
}
static void rvMegacoEntityResetSendBuffer(RvMegacoEntity *remote)
{
assert(!remote->isLocal);
rvStrStreamSeekPos(&remote->u.remote.msgStream, RV_MEGACOENTITY_SENDBUFHDRSIZE);
rvMegacoMessageHeaderEncode(rvMegacoEntityGetAddress(remote->u.remote.localEntity),
rvMegacoVersion, &remote->u.remote.msgStream, remote->stack->encodeCompact);
/* allow as large a transaction as we can fit into a packet. */
remote->u.remote.maxTransactionSize =
(remote->transportType == RV_TRANSPORTTYPE_TCP ? RV_TPKT_MAXLENGTH : RV_UDP_MAXLENGTH)
- (rvStrStreamTellPos(&remote->u.remote.msgStream) - sizeof(RvTpkt));
}
static size_t rvMegacoEntityGetMaxMsgSize(const RvMegacoEntity *remote)
{
/* Return the maximum message size. This is the position in the message stream we can encode up to.
Note that the message stream includes a leading buffer of size RV_MEGACOENTITY_SENDBUFHDRSIZE that
will contain the TPKT header and authentication header. The TPKT header does not count toward our
packet size, but the authentication header does. Therefore the position we want is:
sizeof(RvTpkt) + maxPacketSize. */
assert(!remote->isLocal);
return sizeof(RvTpkt) + rvMegacoStackGetMaxPacketSize(remote->stack);
}
void rvMegacoEntitySend(RvMegacoEntity *remote)
{
RvMegacoStack *stack = remote->stack;
RvMegacoEntity *local = rvMegacoEntityGetLocalEntity(remote);
const RvSocketAddr *destAddr = rvMegacoEntityGetSocketAddr(remote);
RvTransportType transportType = rvMegacoEntityGetTransportType(remote);
RvBool sendSuccessful = rvFalse;
RvSocket *socket;
if(rvPtrListEmpty(&remote->u.remote.transactionsToSend))
return;
if(transportType == RV_TRANSPORTTYPE_TCP)
{
socket = rvMegacoEntityGetSocket(remote);
if(socket == NULL && rvMegacoEntityGetType(local) == RV_MEGACOENTITYTYPE_MG)
{
socket = rvTransportAddTcpActive(&stack->transport, (RvSocketAddr *)destAddr, local);
if(socket != NULL)
rvMegacoEntitySetSocket(remote, socket);
}
}
else
{
socket = rvMegacoEntityGetSocket(local);
}
if(socket != NULL)
{
size_t endMsg = rvStrStreamTellPos(&remote->u.remote.msgStream);
rvMegacoResponseAckEncode(&remote->u.remote.acks, &remote->u.remote.msgStream, stack->encodeCompact);
/* if response acks don't fit, save them for next time */
if(rvStrStreamGetSize(&remote->u.remote.msgStream) > rvMegacoEntityGetMaxMsgSize(remote))
rvStrStreamSeekPos(&remote->u.remote.msgStream, endMsg);
else
rvMegacoResponseAckClear(&remote->u.remote.acks);
/* terminate message with a null, but don't count it in the length */
rvStrStreamEnds(&remote->u.remote.msgStream);
sendSuccessful = rvMegacoStackSendMessage(stack, &remote->u.remote.msgStream,
socket, destAddr, transportType);
}
while(!rvPtrListEmpty(&remote->u.remote.transactionsToSend))
{
RvMegacoTcb *tcb = (RvMegacoTcb *)rvPtrListFront(&remote->u.remote.transactionsToSend);
rvPtrListPopFront(&remote->u.remote.transactionsToSend);
if(sendSuccessful)
rvMegacoTcbUpdateOnSend(tcb);
else
rvMegacoTcbUpdateFailedSend(tcb);
}
rvMegacoEntityResetSendBuffer(remote);
}
void rvMegacoEntitySendNow(RvMegacoEntity *remote)
{
assert(!remote->isLocal);
rvMutexLock(&remote->mutex);
rvTimerStop(rvMegacoEntityGetSendTimer(remote));
rvMegacoEntitySend(remote);
rvMutexUnlock(&remote->mutex);
}
void rvMegacoEntitySendListInsert(RvMegacoEntity *remote, RvMegacoTcb *tcb)
{
size_t size;
assert(!remote->isLocal);
if(remote->dead)
{
rvMegacoTcbUpdateFailedSend(tcb);
return;
}
rvMutexLock(&remote->mutex);
size = rvStrStreamGetSize(&tcb->encodedTransaction);
if(size > remote->u.remote.maxTransactionSize)
{
/* the transaction is too big to ever get sent, so fail now */
rvLogError(&rvLog, "transaction too big to send");
rvMegacoTcbUpdateFailedSend(tcb);
}
else
{
/* if transaction is too big to fit in current message then start a new message */
if(rvStrStreamGetSize(&remote->u.remote.msgStream) + size > rvMegacoEntityGetMaxMsgSize(remote))
rvMegacoEntitySendNow(remote);
/* encode transaction, add to list */
rvPtrListPushBack(&remote->u.remote.transactionsToSend, tcb);
rvStrStreamWriteMem(&remote->u.remote.msgStream, rvStrStreamGetStr(&tcb->encodedTransaction), size);
/* start timer if first transaction and we're willing to wait */
if(remote->stack->sendDelay == 0)
rvMegacoEntitySendNow(remote);
else if(rvPtrListSize(&remote->u.remote.transactionsToSend) == 1)
rvTimerReset(rvMegacoEntityGetSendTimer(remote), remote->stack->sendDelay);
}
rvMutexUnlock(&remote->mutex);
}
static void rvMegacoEntityOnSendTimer(RvTimer *timer, void *data)
{
RvMegacoEntity *remote = (RvMegacoEntity *)data;
assert(!remote->isLocal);
rvMutexLock(&remote->mutex);
rvMegacoEntitySend(remote);
rvMutexUnlock(&remote->mutex);
}
static RvRandomType rvMegacoEntityGetRandomSeed(const RvMegacoEntity *local)
{
#ifdef RV_DEBUG_ON
/* use same seed during DEBUG for more repeatability */
return 423956;
#else
/* include entity pointer as component to get different seeds for
multiple entities starting simultaneously */
RvTimespec wallTime;
rvTimeGetEpochTime(&wallTime);
return (RvRandomType)rvTimespecSecs(&wallTime) ^
(RvRandomType)rvTimespecNsecs(&wallTime) ^ (RvRandomType)local;
#endif
}
static void rvMegacoEntityConstructCommon(RvMegacoEntity *entity,
const RvMegacoEntityAddress *address, RvBool primary, RvMegacoEntityType entityType,
RvTransportType transportType, RvMegacoEncoding encoding, RvMegacoStack *stack)
{
rvMutexConstruct(&entity->mutex);
rvMegacoEntityAddressConstructCopy(&entity->mId, address, stack->entityAlloc);
entity->entityType = entityType;
entity->transportType = transportType;
entity->encoding = encoding;
entity->stack = stack;
entity->primary = primary;
entity->dead = rvFalse;
entity->socket = NULL;
rvHashConstruct(RvMegacoTransactionId, RvMegacoTcbPtr)(
&entity->tcbs, RV_MEGACOENTITY_TCBBUCKETS, hashTransactionId,
stack->entityAlloc, &rvDefaultAlloc);
}
/*$
{function:
{name: rvMegacoEntityConstructLocal}
{class: RvMegacoEntity}
{include: rvmegacoentity.h}
{description:
{p: Constructs a local entity object.}
}
{proto: RvMegacoEntity *rvMegacoEntityConstructLocal(RvMegacoEntity *entity,
const RvMegacoEntityAddress *address, RvBool primary, RvMegacoEntityType entityType,
RvTransportType transportType, RvMegacoEncoding encoding, RvMegacoStack *stack);}
{params:
{param: {n:entity} {d:The local entity object.}}
{param: {n:address} {d:The address of the local entity.}}
{param: {n:primary} {d:Determines the active entity among a set of redundant entities.
Set to rvTrue for a primary entity. Set to rvFalse for a secondary entity.}}
{param: {n:entityType} {d:The type of entity, MG or MGC.}}
{param: {n:transportType} {d:The transport type the entity will use.}}
{param: {n:encoding} {d:The encoding type the entity will use.}}
{param: {n:stack} {d:The stack that the entity will be created on.}}
}
{returns: A pointer to the constructed object, or NULL if construction failed.}
}
$*/
RvMegacoEntity *rvMegacoEntityConstructLocal(RvMegacoEntity *entity, const RvMegacoEntityAddress *address, RvBool primary,
RvMegacoEntityType entityType, RvTransportType transportType, RvMegacoEncoding encoding, RvMegacoStack *stack)
{
RvInetPort port = entityType == RV_MEGACOENTITYTYPE_MG ? 0 :
encoding == RV_MEGACOENCODING_TEXT ? RV_MEGACO_DEFAULTMGCTEXTPORT : RV_MEGACO_DEFAULTMGCBINARYPORT;
rvMegacoEntityConstructCommon(entity, address, primary, entityType, transportType, encoding, stack);
entity->isLocal = rvTrue;
rvHashConstruct(RvMegacoEntityAddress, RvMegacoEntityPtr)(&entity->u.local.remotes, RV_MEGACOENTITY_REMOTEBUCKETS, hashEntityAddress,
stack->entityAlloc, &rvDefaultAlloc);
entity->u.local.nextTransactionId = 1;
entity->u.local.processRequest = NULL;
entity->u.local.processError = NULL;
rvRandomGeneratorConstruct(&entity->u.local.randomGenerator, rvMegacoEntityGetRandomSeed(entity));
switch(transportType)
{
case RV_TRANSPORTTYPE_UDP:
rvMegacoEntitySetSocket(entity, rvTransportAddUdpSocket(&stack->transport, port, entity));
break;
case RV_TRANSPORTTYPE_TCP:
if(entityType == RV_MEGACOENTITYTYPE_MGC)
rvMegacoEntitySetSocket(entity, rvTransportAddTcpPassive(&stack->transport, port, entity));
break;
}
return entity;
}
static RvSocketAddr *rvMegacoEntityConstructMgcSocketAddr(const RvMegacoEntity *localEntity, RvSocketAddr *socketAddr, const RvMegacoEntityAddress *entityAddr)
{
const char *dnsOrIp;
RvInetPort port = rvMegacoEntityGetEncoding(localEntity) == RV_MEGACOENCODING_TEXT ?
RV_MEGACO_DEFAULTMGCTEXTPORT : RV_MEGACO_DEFAULTMGCBINARYPORT;
assert(rvMegacoEntityGetType(localEntity) == RV_MEGACOENTITYTYPE_MG);
switch(rvMegacoEntityAddressGetType(entityAddr))
{
case RV_MEGACOENTITYADDRESSTYPE_IP:
case RV_MEGACOENTITYADDRESSTYPE_DNS:
dnsOrIp = rvMegacoEntityAddressGetAddress(entityAddr);
break;
case RV_MEGACOENTITYADDRESSTYPE_DEVICE:
dnsOrIp = rvMegacoEntityAddressGetDeviceIp(entityAddr);
break;
case RV_MEGACOENTITYADDRESSTYPE_MTP:
rvLogError(&rvLog, "MTP addresses not supported");
return NULL;
default:
return NULL;
}
return rvSocketAddrConstructInetByName(socketAddr, dnsOrIp, port);
}
/*$
{function:
{name: rvMegacoEntityConstructRemote}
{class: RvMegacoEntity}
{include: rvmegacoentity.h}
{description:
{p: Constructs a remote entity object.}
}
{proto: RvMegacoEntity *rvMegacoEntityConstructRemote(RvMegacoEntity *entity,
const RvMegacoEntityAddress *address, RvBool primary, RvMegacoEntity *local);}
{params:
{param: {n:entity} {d:The remote entity object.}}
{param: {n:address} {d:The address of the remote entity.}}
{param: {n:primary} {d:Determines the active entity among a set of redundant entities.
Set to rvTrue for a primary entity. Set to rvFalse for a secondary entity.}}
{param: {n:local} {d:The local MG entity that the remote entity will communicate with.}}
}
{returns: A pointer to the constructed object, or NULL if construction failed.}
}
$*/
RvMegacoEntity *rvMegacoEntityConstructRemote(RvMegacoEntity *entity,
const RvMegacoEntityAddress *address, RvBool primary, RvMegacoEntity *local)
{
RvSocketAddr socketAddr;
if(rvMegacoEntityConstructMgcSocketAddr(local, &socketAddr, address) == NULL)
return NULL;
rvMegacoEntityConstructRemoteEx(entity, address, &socketAddr, primary, local);
rvSocketAddrDestruct(&socketAddr);
return entity;
}
RvMegacoEntity *rvMegacoEntityConstructRemoteEx(RvMegacoEntity *entity,
const RvMegacoEntityAddress *address, const RvSocketAddr *socketAddr, RvBool primary, RvMegacoEntity *local)
{
RvMegacoStack *stack = local->stack;
RvMegacoEntityType entityType = rvMegacoEntityGetType(local) == RV_MEGACOENTITYTYPE_MGC ?
RV_MEGACOENTITYTYPE_MG : RV_MEGACOENTITYTYPE_MGC;
rvMegacoEntityConstructCommon(entity, address, primary, entityType,
rvMegacoEntityGetTransportType(local), rvMegacoEntityGetEncoding(local), stack);
entity->isLocal = rvFalse;
entity->u.remote.localEntity = local;
rvSocketAddrConstructCopy(&entity->u.remote.socketAddr, socketAddr, stack->entityAlloc);
rvPtrListConstruct(&entity->u.remote.transactionsToSend, stack->entityAlloc);
rvStrStreamConstruct(&entity->u.remote.msgStream, RV_MEGACOENTITY_ENCODEBUFINITSIZE, stack->sendBufAlloc);
rvMegacoResponseAckConstruct(&entity->u.remote.acks);
rvTimerConstruct(&entity->u.remote.sendTimer, 0, rvMegacoEntityOnSendTimer, entity);
rvRoundTripStatsConstruct(&entity->u.remote.roundTripStats, &local->u.local.randomGenerator);
entity->u.remote.deleteMe = rvFalse;
rvHashInsertUnique(RvMegacoEntityAddress, RvMegacoEntityPtr)(
&local->u.local.remotes, (RvMegacoEntityAddress *)rvMegacoEntityGetAddress(entity), &entity);
rvMegacoEntityResetSendBuffer(entity);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -