📄 rvtransport.c
字号:
if((tcp->cur += rcvd) == tcp->end)
{
/* Current read is complete! */
RvPacketData *packetData = tcp->packetData;
if(packetData == NULL)
{
/* TPKT read is complete, start packet read. */
rvTcpBlockStartPacketRead(tcp);
}
else
{
/* Packet read is complete, process packet and start next TPKT read. */
RvSocketData *socketData = &tcp->socketData;
RvTransport *transport = socketData->transport;
*tcp->end = 0; /* terminate string */
packetData->socket = socket;
packetData->socketData = socketData;
packetData->transportType = RV_TRANSPORTTYPE_TCP;
packetData->fromAddr.tcpAddr = &tcp->fromAddr;
rvMutexLock(&socketData->mutex);
if(!socketData->removed && rvQueuePush(&transport->packetQueue, packetData) == 0)
socketData->packetsQueued++;
else
rvAllocDeallocate(transport->recvBufAlloc, packetData->bufSize, packetData);
rvMutexUnlock(&socketData->mutex);
rvTcpBlockStartTpktRead(tcp);
}
}
}
else
{
rvSocketSelectUnregisterListener(ss, listener);
rvTransportTryToDeleteSocketStuff(listener);
}
}
static void rvTransportAcceptTcp(RvSocketSelect *ss, RvSocketListener *listener, void *data)
{
RvSocketData *socketData = (RvSocketData *)data;
RvTransport *transport = socketData->transport;
RvSocket *socket = (RvSocket *)rvAllocAllocate(transport->genAlloc, sizeof(RvSocket));
RvSocketListener *newListener = (RvSocketListener *)rvAllocAllocate(transport->genAlloc, sizeof(RvSocketListener));
RvTcpBlock *tcpBlock = (RvTcpBlock *)rvAllocAllocate(transport->genAlloc, sizeof(RvTcpBlock));
rvSocketAccept(rvSocketListenerGetSocket(listener), socket);
rvTcpBlockConstruct(tcpBlock, socket, newListener, transport, socketData->userData);
rvSocketListenerConstruct(newListener, socket, rvTransportRecvFromTcp, tcpBlock);
rvSocketSelectRegisterListener(ss, newListener);
}
/******************************************************************************/
static void rvTransportDeleteSocketStuff(RvSocketData *socketData)
{
RvTransport *transport = socketData->transport;
RvSocketListener *listener = socketData->listener;
RvSocket *socket = rvSocketListenerGetSocket(listener);
RvAlloc *alloc = transport->genAlloc;
if(transport->onSocketDelete != NULL)
transport->onSocketDelete(socket, socketData->userData);
if(rvSocketListenerGetFunc(listener) == rvTransportRecvFromTcp)
{
RvTcpBlock *tcpBlock = (RvTcpBlock *)rvSocketListenerGetData(listener);
rvTcpBlockDestruct(tcpBlock);
rvAllocDeallocate(alloc, sizeof(RvTcpBlock), tcpBlock);
}
else
{
rvSocketDataDestruct(socketData);
rvAllocDeallocate(alloc, sizeof(RvSocketData), socketData);
}
rvSocketListenerDestruct(listener);
rvSocketDestruct(socket);
rvAllocDeallocate(alloc, sizeof(RvSocketListener), listener);
rvAllocDeallocate(alloc, sizeof(RvSocket), socket);
}
static void rvTransportTryToDeleteSocketStuff(RvSocketListener *listener)
{
/* If packets are queued up that need the socket, postpone delete. */
RvSocketData *socketData =
rvSocketListenerGetFunc(listener) == rvTransportRecvFromTcp ?
&((RvTcpBlock *)rvSocketListenerGetData(listener))->socketData :
(RvSocketData *)rvSocketListenerGetData(listener);
rvMutexLock(&socketData->mutex);
if(socketData->packetsQueued == 0)
{
/* unlock before deleting */
rvMutexUnlock(&socketData->mutex);
rvTransportDeleteSocketStuff(socketData);
}
else
{
socketData->removed = rvTrue;
rvMutexUnlock(&socketData->mutex);
}
}
/******************************************************************************/
RvTransport *rvTransportConstruct(RvTransport *x, RvRecvPacketCb f,
size_t numThreads, int priority, RvAlloc *genAlloc, RvAlloc *recvBufAlloc)
{
size_t i;
rvSocketEngineConstruct(&x->socketEngine, priority, genAlloc);
rvQueueConstruct(&x->packetQueue, RV_TRANSPORT_QUEUESIZE);
x->processPacket = f;
x->callbacksPending = 0;
x->onSocketDelete = NULL;
x->genAlloc = genAlloc;
x->recvBufAlloc = recvBufAlloc;
x->numThreads = numThreads;
x->threads = (RvThread *)rvAllocAllocate(genAlloc, numThreads * sizeof(RvThread));
for(i=0; i<numThreads; ++i)
{
RvStrStream threadName;
rvStrStreamConstruct(&threadName, 32, genAlloc);
rvStrStreamWriteStr(&threadName, "TRANSPORT.");
rvStrStreamWriteUInt(&threadName, i);
rvStrStreamEnds(&threadName);
rvThreadConstruct(&x->threads[i], rvStrStreamGetStr(&threadName), priority,
RV_TRANSPORT_STACKSIZE, rvTransportProcessPacketQueue, x);
rvThreadStart(&x->threads[i]);
rvStrStreamDestruct(&threadName);
}
return x;
}
void rvTransportDestruct(RvTransport *x)
{
RvPacketData *packetData;
size_t i;
rvQueueStop(&x->packetQueue);
while(rvQueuePop(&x->packetQueue, (RvQueueType *)&packetData) != -2) /* while not empty */
rvAllocDeallocate(x->recvBufAlloc, packetData->bufSize, packetData);
rvQueueDestruct(&x->packetQueue);
/* TODO: delete listeners, tcp blocks & sockets */
rvSocketEngineDestruct(&x->socketEngine);
for(i=0; i<x->numThreads; ++i)
rvThreadDestruct(&x->threads[i]);
rvAllocDeallocate(x->genAlloc, x->numThreads * sizeof(RvThread), x->threads);
}
RvSocket *rvTransportAddUdpSocket(RvTransport *x, RvInetPort port, void *data)
{
RvSocketData *socketData = (RvSocketData *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketData));
RvSocket *socket = (RvSocket *)rvAllocAllocate(x->genAlloc, sizeof(RvSocket));
RvSocketListener *listener = (RvSocketListener *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketListener));
rvSocketDataConstruct(socketData, x, listener, data);
rvSocketConstructUdp(socket, port);
rvSocketListenerConstruct(listener, socket, rvTransportRecvFromUdp, socketData);
rvSocketEngineRegisterListener(&x->socketEngine, listener);
return socket;
}
RvSocket *rvTransportAddTcpPassive(RvTransport *x, RvInetPort port, void *data)
{
RvSocketData *socketData = (RvSocketData *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketData));
RvSocket *socket = (RvSocket *)rvAllocAllocate(x->genAlloc, sizeof(RvSocket));
RvSocketListener *listener = (RvSocketListener *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketListener));
rvSocketDataConstruct(socketData, x, listener, data);
rvSocketConstructTcpPassive(socket, port);
rvSocketListenerConstruct(listener, socket, rvTransportAcceptTcp, socketData);
rvSocketEngineRegisterListener(&x->socketEngine, listener);
return socket;
}
RvSocket *rvTransportAddTcpActive(RvTransport *x, RvSocketAddr *addr, void *data)
{
RvSocket *socket = (RvSocket *)rvAllocAllocate(x->genAlloc, sizeof(RvSocket));
if(rvSocketConstructTcpActive(socket, 0, addr) != NULL)
{
RvSocketListener *listener = (RvSocketListener *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketListener));
RvTcpBlock *tcpBlock = (RvTcpBlock *)rvAllocAllocate(x->genAlloc, sizeof(RvTcpBlock));
rvTcpBlockConstruct(tcpBlock, socket, listener, x, data);
rvSocketListenerConstruct(listener, socket, rvTransportRecvFromTcp, tcpBlock);
rvSocketEngineRegisterListener(&x->socketEngine, listener);
return socket;
}
/* construct failed: clean up */
rvAllocDeallocate(x->genAlloc, sizeof(RvSocket), socket);
return NULL;
}
void rvTransportRemoveSocket(RvTransport *x, RvSocket *socket)
{
RvSocketListener *sl = rvSocketEngineUnregisterSocket(&x->socketEngine, socket);
rvTransportRemoveUnregisteredSocket(x, socket, sl);
}
RvSocketListener *rvTransportUnregisterSocket(RvTransport *x, RvSocket *socket)
{
return rvSocketEngineUnregisterSocket(&x->socketEngine, socket);
}
void rvTransportRemoveUnregisteredSocket(RvTransport *x, RvSocket *socket, RvSocketListener *sl)
{
if(sl != NULL)
rvTransportTryToDeleteSocketStuff(sl);
}
void rvTransportStart(RvTransport *x)
{
rvSocketEngineStart(&x->socketEngine);
}
void rvTransportStop(RvTransport *x)
{
rvSocketEngineStop(&x->socketEngine);
while(!rvQueueEmpty(&x->packetQueue) && x->callbacksPending)
rvThreadSleep(1);
}
void rvTransportRegisterSocketDeleteCb(RvTransport *x, RvSocketDeleteCb f)
{
x->onSocketDelete = f;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -