📄 rvtransport.c
字号:
/******************************************************************************
Filename: rvtransport.h
Description: class used to receive complete packets from a group of sockets.
*******************************************************************************
Copyright (c) 2000 RADVision Inc.
*******************************************************************************
NOTICE:
This document contains information that is proprietary to RADVision Inc.
No part of this publication may be reproduced in any form whatsoever without
written prior approval by RADVision Inc.
RADVision Inc. reserves the right to revise this publication and make changes
without obligation to notify any person of such revisions or changes.
******************************************************************************/
#include "rvtransport.h"
#include "rvplatform.h"
#include "rvlog.h"
#include "rvtpkt.h"
#include "rvstrstream.h"
#if defined(RV_SOCKETS_WSOCK)
# include <windows.h>
#elif defined(RV_SOCKETS_PSOS)
# include "rvsharer.h"
#endif
RvBool rvTransportInit(void) {
#if defined(RV_SOCKETS_WSOCK)
WSADATA wsaData;
/* Microsoft complexity */
if (WSAStartup(MAKEWORD(1, 1), &wsaData) != 0) {
rvLogError(&rvLog, "rvTransportInit: Can't start winsock");
return rvFalse;
}
#elif defined(RV_SOCKETS_PSOS)
/* PSoS complexity */
rvSharerInit();
#endif
return rvTrue;
}
void rvTransportEnd() {
#if defined(RV_SOCKETS_WSOCK)
WSACleanup();
#elif defined(RV_SOCKETS_PSOS)
rvSharerEnd();
#endif
}
/******************************************************************************/
typedef struct
{
RvTransport *transport;
RvSocketListener *listener;
void *userData;
RvMutex mutex;
int packetsQueued;
RvBool removed;
} RvSocketData;
static RvSocketData *rvSocketDataConstruct(RvSocketData *x, RvTransport *transport, RvSocketListener *listener, void *data)
{
x->transport = transport;
x->listener = listener;
x->userData = data;
rvMutexConstruct(&x->mutex);
x->packetsQueued = 0;
x->removed = rvFalse;
return x;
}
static void rvSocketDataDestruct(RvSocketData *x)
{
rvMutexDestruct(&x->mutex);
}
/******************************************************************************/
static void rvTransportDeleteSocketStuff(RvSocketData *socketData);
static void rvTransportTryToDeleteSocketStuff(RvSocketListener *listener);
#define RV_TRANSPORT_QUEUESIZE 32
#define RV_TRANSPORT_STACKSIZE 12288
typedef struct
{
size_t length;
size_t bufSize;
RvSocket *socket;
RvSocketData *socketData;
RvTransportType transportType;
union
{
RvSocketAddr udpAddr;
RvSocketAddr *tcpAddr;
} fromAddr;
} RvPacketData;
#define rvPacketDataGetPacket(packetData) (((char *)(packetData)) + sizeof(RvPacketData))
static void rvTransportProcessPacketQueue(RvThread *thread, void *data)
{
RvTransport *transport = (RvTransport *)data;
for(;;)
{
RvPacketData *packetData;
RvSocketData *socketData;
RvBool deleteSocket;
if(rvQueuePop(&transport->packetQueue, (RvQueueType *)&packetData))
break;
socketData = packetData->socketData;
transport->callbacksPending++;
transport->processPacket(rvPacketDataGetPacket(packetData), packetData->length, packetData->socket,
packetData->transportType == RV_TRANSPORTTYPE_TCP ? packetData->fromAddr.tcpAddr : &packetData->fromAddr.udpAddr,
packetData->transportType, socketData->userData);
transport->callbacksPending--;
rvAllocDeallocate(transport->recvBufAlloc, packetData->bufSize, packetData);
rvMutexLock(&socketData->mutex);
deleteSocket = --socketData->packetsQueued == 0 && socketData->removed;
rvMutexUnlock(&socketData->mutex);
if(deleteSocket)
rvTransportDeleteSocketStuff(socketData);
}
}
/******************************************************************************/
#define RV_TRANSPORT_LARGESTBUF 2048
static size_t rvTransportGetUdpBufferSize(RvAlloc *recvBufAlloc)
{
size_t sz = rvAllocGetMaxSize(recvBufAlloc);
return sz < RV_TRANSPORT_LARGESTBUF ? sz : RV_TRANSPORT_LARGESTBUF;
}
static void rvTransportRecvFromUdp(RvSocketSelect *ss, RvSocketListener *listener, void *data)
{
RvSocketData *socketData = (RvSocketData *)data;
RvTransport *transport = socketData->transport;
RvSocket *socket = rvSocketListenerGetSocket(listener);
size_t bufSize = rvTransportGetUdpBufferSize(transport->recvBufAlloc);
RvPacketData *packetData = (RvPacketData *)rvAllocAllocate(transport->recvBufAlloc, bufSize);
char *packet = rvPacketDataGetPacket(packetData);
RvBool packetEnqueued = rvFalse;
/* do a receive; reserve 1 byte for the null terminator */
int rcvd = rvSocketRecvFrom(socket, packet, bufSize - sizeof(RvPacketData) - 1, &packetData->fromAddr.udpAddr);
if(rcvd >= 0)
{
packet[rcvd] = 0;
packetData->length = rcvd;
packetData->bufSize = bufSize;
packetData->socket = socket;
packetData->socketData = socketData;
packetData->transportType = RV_TRANSPORTTYPE_UDP;
rvMutexLock(&socketData->mutex);
if(!socketData->removed && rvQueuePush(&transport->packetQueue, packetData) == 0)
{
socketData->packetsQueued++;
packetEnqueued = rvTrue;
}
rvMutexUnlock(&socketData->mutex);
}
else
{
rvLogError(&rvLog, "Udp Socket Error");
}
if(!packetEnqueued)
rvAllocDeallocate(transport->recvBufAlloc, bufSize, packetData);
}
/******************************************************************************/
typedef struct
{
RvSocketData socketData;
RvSocketAddr fromAddr;
RvTpkt tpkt;
RvPacketData *packetData;
char *cur, *end;
} RvTcpBlock;
static void rvTcpBlockStartTpktRead(RvTcpBlock *x)
{
/* Prepare block for read of TPKT header */
x->packetData = NULL;
x->cur = (char *)&x->tpkt;
x->end = (char *)&x->tpkt + sizeof(x->tpkt);
}
static void rvTcpBlockStartPacketRead(RvTcpBlock *x)
{
size_t length = rvTpktGetLength(&x->tpkt);
/* allocate; get 1 extra byte for the null terminator */
size_t bufSize = sizeof(RvPacketData) + length + 1;
x->packetData = (RvPacketData *)rvAllocAllocate(x->socketData.transport->recvBufAlloc, bufSize);
x->packetData->length = length;
x->packetData->bufSize = bufSize;
x->cur = rvPacketDataGetPacket(x->packetData);
x->end = x->cur + length;
}
static RvTcpBlock *rvTcpBlockConstruct(RvTcpBlock *x, RvSocket *socket, RvSocketListener *listener, RvTransport *transport, void *data)
{
rvSocketDataConstruct(&x->socketData, transport, listener, data);
rvSocketAddrConstruct(&x->fromAddr);
rvSocketGetRemoteAddr(socket, &x->fromAddr);
rvTcpBlockStartTpktRead(x);
return x;
}
static void rvTcpBlockDestruct(RvTcpBlock *x)
{
if(x->packetData != NULL)
rvAllocDeallocate(x->socketData.transport->recvBufAlloc, x->packetData->bufSize, x->packetData);
rvSocketDataDestruct(&x->socketData);
rvSocketAddrDestruct(&x->fromAddr);
}
static void rvTransportRecvFromTcp(RvSocketSelect *ss, RvSocketListener *listener, void *data)
{
RvTcpBlock *tcp = (RvTcpBlock *)data;
RvSocket *socket = rvSocketListenerGetSocket(listener);
int rcvd = rvSocketRecv(socket, tcp->cur, tcp->end - tcp->cur);
if(rcvd > 0)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -