⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rvtransport.c

📁 h.248协议源码
💻 C
📖 第 1 页 / 共 2 页
字号:
		if((tcp->cur += rcvd) == tcp->end)
		{
			/* Current read is complete! */
			RvPacketData *packetData = tcp->packetData;

			if(packetData == NULL)
			{
				/* TPKT read is complete, start packet read. */
				rvTcpBlockStartPacketRead(tcp);
			}
			else
			{
				/* Packet read is complete, process packet and start next TPKT read. */
				RvSocketData *socketData = &tcp->socketData;
				RvTransport *transport = socketData->transport;

				*tcp->end = 0; /* terminate string */

				packetData->socket = socket;
				packetData->socketData = socketData;
				packetData->transportType = RV_TRANSPORTTYPE_TCP;
				packetData->fromAddr.tcpAddr = &tcp->fromAddr;

				rvMutexLock(&socketData->mutex);
				if(!socketData->removed && rvQueuePush(&transport->packetQueue, packetData) == 0)
					socketData->packetsQueued++;
				else
					rvAllocDeallocate(transport->recvBufAlloc, packetData->bufSize, packetData);
				rvMutexUnlock(&socketData->mutex);

				rvTcpBlockStartTpktRead(tcp);
			}
		}
	}
	else
	{
		rvSocketSelectUnregisterListener(ss, listener);
		rvTransportTryToDeleteSocketStuff(listener);
	}
}

static void rvTransportAcceptTcp(RvSocketSelect *ss, RvSocketListener *listener, void *data)
{
	RvSocketData *socketData = (RvSocketData *)data;
	RvTransport *transport = socketData->transport;
	RvSocket *socket = (RvSocket *)rvAllocAllocate(transport->genAlloc, sizeof(RvSocket));
	RvSocketListener *newListener = (RvSocketListener *)rvAllocAllocate(transport->genAlloc, sizeof(RvSocketListener));
	RvTcpBlock *tcpBlock = (RvTcpBlock *)rvAllocAllocate(transport->genAlloc, sizeof(RvTcpBlock));

	rvSocketAccept(rvSocketListenerGetSocket(listener), socket);
	rvTcpBlockConstruct(tcpBlock, socket, newListener, transport, socketData->userData);
	rvSocketListenerConstruct(newListener, socket, rvTransportRecvFromTcp, tcpBlock);
	rvSocketSelectRegisterListener(ss, newListener);
}

/******************************************************************************/

static void rvTransportDeleteSocketStuff(RvSocketData *socketData)
{
	RvTransport *transport = socketData->transport;
	RvSocketListener *listener = socketData->listener;
	RvSocket *socket = rvSocketListenerGetSocket(listener);
	RvAlloc *alloc = transport->genAlloc;	

	if(transport->onSocketDelete != NULL)
		transport->onSocketDelete(socket, socketData->userData);

	if(rvSocketListenerGetFunc(listener) == rvTransportRecvFromTcp)
	{
		RvTcpBlock *tcpBlock = (RvTcpBlock *)rvSocketListenerGetData(listener);
		rvTcpBlockDestruct(tcpBlock);
		rvAllocDeallocate(alloc, sizeof(RvTcpBlock), tcpBlock);
	}
	else
	{
		rvSocketDataDestruct(socketData);
		rvAllocDeallocate(alloc, sizeof(RvSocketData), socketData);
	}
	rvSocketListenerDestruct(listener);
	rvSocketDestruct(socket);
	rvAllocDeallocate(alloc, sizeof(RvSocketListener), listener);
	rvAllocDeallocate(alloc, sizeof(RvSocket), socket);
}

static void rvTransportTryToDeleteSocketStuff(RvSocketListener *listener)
{
	/* If packets are queued up that need the socket, postpone delete. */ 

	RvSocketData *socketData = 
		rvSocketListenerGetFunc(listener) == rvTransportRecvFromTcp ?
		&((RvTcpBlock *)rvSocketListenerGetData(listener))->socketData :
		(RvSocketData *)rvSocketListenerGetData(listener);
	
	rvMutexLock(&socketData->mutex);
	if(socketData->packetsQueued == 0)
	{
		/* unlock before deleting */
		rvMutexUnlock(&socketData->mutex);
		rvTransportDeleteSocketStuff(socketData);
	}
	else
	{
		socketData->removed = rvTrue;
		rvMutexUnlock(&socketData->mutex);
	}
}

/******************************************************************************/

RvTransport *rvTransportConstruct(RvTransport *x, RvRecvPacketCb f,
	size_t numThreads, int priority, RvAlloc *genAlloc, RvAlloc *recvBufAlloc)
{
	size_t i;

	rvSocketEngineConstruct(&x->socketEngine, priority, genAlloc);
	rvQueueConstruct(&x->packetQueue, RV_TRANSPORT_QUEUESIZE);
	x->processPacket = f;
	x->callbacksPending = 0;
	x->onSocketDelete = NULL;
	x->genAlloc = genAlloc;
	x->recvBufAlloc = recvBufAlloc;

	x->numThreads = numThreads;
	x->threads = (RvThread *)rvAllocAllocate(genAlloc, numThreads * sizeof(RvThread));

	for(i=0; i<numThreads; ++i)
	{
 		RvStrStream threadName;
		rvStrStreamConstruct(&threadName, 32, genAlloc);
		rvStrStreamWriteStr(&threadName, "TRANSPORT.");
		rvStrStreamWriteUInt(&threadName, i);
		rvStrStreamEnds(&threadName);

		rvThreadConstruct(&x->threads[i], rvStrStreamGetStr(&threadName), priority,
			RV_TRANSPORT_STACKSIZE, rvTransportProcessPacketQueue, x);
		rvThreadStart(&x->threads[i]);

		rvStrStreamDestruct(&threadName);
	}

	return x;
}

void rvTransportDestruct(RvTransport *x)
{
	RvPacketData *packetData;
	size_t i;

	rvQueueStop(&x->packetQueue);
	while(rvQueuePop(&x->packetQueue, (RvQueueType *)&packetData) != -2) /* while not empty */
		rvAllocDeallocate(x->recvBufAlloc, packetData->bufSize, packetData);
	rvQueueDestruct(&x->packetQueue);

	/* TODO: delete listeners, tcp blocks & sockets */

	rvSocketEngineDestruct(&x->socketEngine);

	for(i=0; i<x->numThreads; ++i)
		rvThreadDestruct(&x->threads[i]);

	rvAllocDeallocate(x->genAlloc, x->numThreads * sizeof(RvThread), x->threads);
}

RvSocket *rvTransportAddUdpSocket(RvTransport *x, RvInetPort port, void *data)
{
	RvSocketData *socketData = (RvSocketData *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketData));
	RvSocket *socket = (RvSocket *)rvAllocAllocate(x->genAlloc, sizeof(RvSocket));
	RvSocketListener *listener = (RvSocketListener *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketListener));

	rvSocketDataConstruct(socketData, x, listener, data);
	rvSocketConstructUdp(socket, port);
	rvSocketListenerConstruct(listener, socket, rvTransportRecvFromUdp, socketData);
	rvSocketEngineRegisterListener(&x->socketEngine, listener);
	return socket;
}

RvSocket *rvTransportAddTcpPassive(RvTransport *x, RvInetPort port, void *data)
{
	RvSocketData *socketData = (RvSocketData *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketData));
	RvSocket *socket = (RvSocket *)rvAllocAllocate(x->genAlloc, sizeof(RvSocket));
	RvSocketListener *listener = (RvSocketListener *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketListener));

	rvSocketDataConstruct(socketData, x, listener, data);
	rvSocketConstructTcpPassive(socket, port);
	rvSocketListenerConstruct(listener, socket, rvTransportAcceptTcp, socketData);
	rvSocketEngineRegisterListener(&x->socketEngine, listener);
	return socket;
}

RvSocket *rvTransportAddTcpActive(RvTransport *x, RvSocketAddr *addr, void *data)
{
	RvSocket *socket = (RvSocket *)rvAllocAllocate(x->genAlloc, sizeof(RvSocket));

	if(rvSocketConstructTcpActive(socket, 0, addr) != NULL)
	{
		RvSocketListener *listener = (RvSocketListener *)rvAllocAllocate(x->genAlloc, sizeof(RvSocketListener));
		RvTcpBlock *tcpBlock = (RvTcpBlock *)rvAllocAllocate(x->genAlloc, sizeof(RvTcpBlock));
		rvTcpBlockConstruct(tcpBlock, socket, listener, x, data);
		rvSocketListenerConstruct(listener, socket, rvTransportRecvFromTcp, tcpBlock);
		rvSocketEngineRegisterListener(&x->socketEngine, listener);
		return socket;
	}

	/* construct failed: clean up */
	rvAllocDeallocate(x->genAlloc, sizeof(RvSocket), socket);
	return NULL;
}

void rvTransportRemoveSocket(RvTransport *x, RvSocket *socket)
{
	RvSocketListener *sl = rvSocketEngineUnregisterSocket(&x->socketEngine, socket);
	rvTransportRemoveUnregisteredSocket(x, socket, sl);
}

RvSocketListener *rvTransportUnregisterSocket(RvTransport *x, RvSocket *socket)
{
	return rvSocketEngineUnregisterSocket(&x->socketEngine, socket);
}

void rvTransportRemoveUnregisteredSocket(RvTransport *x, RvSocket *socket, RvSocketListener *sl)
{
	if(sl != NULL)
		rvTransportTryToDeleteSocketStuff(sl);
}

void rvTransportStart(RvTransport *x)
{
	rvSocketEngineStart(&x->socketEngine);
}

void rvTransportStop(RvTransport *x)
{
	rvSocketEngineStop(&x->socketEngine);
	while(!rvQueueEmpty(&x->packetQueue) && x->callbacksPending)
		rvThreadSleep(1);
}

void rvTransportRegisterSocketDeleteCb(RvTransport *x, RvSocketDeleteCb f)
{
	x->onSocketDelete = f;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -