📄 rtpinterface.cpp
字号:
/**********This library is free software; you can redistribute it and/or modify it underthe terms of the GNU Lesser General Public License as published by theFree Software Foundation; either version 2.1 of the License, or (at youroption) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)This library is distributed in the hope that it will be useful, but WITHOUTANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESSFOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License formore details.You should have received a copy of the GNU Lesser General Public Licensealong with this library; if not, write to the Free Software Foundation, Inc.,51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA**********/// "liveMedia"// Copyright (c) 1996-2010 Live Networks, Inc. All rights reserved.// An abstraction of a network interface used for RTP (or RTCP).// (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to// be implemented transparently.)// Implementation#include "RTPInterface.hh"#include <GroupsockHelper.hh>#include <stdio.h>////////// Helper Functions - Definition //////////// Helper routines and data structures, used to implement// sending/receiving RTP/RTCP over a TCP socket:static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize, int socketNum, unsigned char streamChannelId);// Reading RTP-over-TCP is implemented using two levels of hash tables.// The top-level hash table maps TCP socket numbers to a// "SocketDescriptor" that contains a hash table for each of the// sub-channels that are reading from this socket.static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) { _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent); if (ourTables->socketTable == NULL) { // Create a new socket number -> SocketDescriptor mapping table: ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS); } return (HashTable*)(ourTables->socketTable);}class SocketDescriptor {public: SocketDescriptor(UsageEnvironment& env, int socketNum); virtual ~SocketDescriptor(); void registerRTPInterface(unsigned char streamChannelId, RTPInterface* rtpInterface); RTPInterface* lookupRTPInterface(unsigned char streamChannelId); void deregisterRTPInterface(unsigned char streamChannelId); void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) { fServerRequestAlternativeByteHandler = handler; fServerRequestAlternativeByteHandlerClientData = clientData; }private: static void tcpReadHandler(SocketDescriptor*, int mask); void tcpReadHandler1(int mask);private: UsageEnvironment& fEnv; int fOurSocketNum; HashTable* fSubChannelHashTable; ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler; void* fServerRequestAlternativeByteHandlerClientData; u_int8_t fStreamChannelId, fSizeByte1; enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;};static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) { HashTable* table = socketHashTable(env, createIfNotFound); if (table == NULL) return NULL; char const* key = (char const*)(long)sockNum; SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key)); if (socketDescriptor == NULL && createIfNotFound) { socketDescriptor = new SocketDescriptor(env, sockNum); table->Add((char const*)(long)(sockNum), socketDescriptor); } return socketDescriptor;}static void removeSocketDescription(UsageEnvironment& env, int sockNum) { char const* key = (char const*)(long)sockNum; HashTable* table = socketHashTable(env); table->Remove(key); if (table->IsEmpty()) { // We can also delete the table (to reclaim space): _Tables* ourTables = _Tables::getOurTables(env); delete table; ourTables->socketTable = NULL; ourTables->reclaimIfPossible(); }}////////// RTPInterface - Implementation //////////RTPInterface::RTPInterface(Medium* owner, Groupsock* gs) : fOwner(owner), fGS(gs), fTCPStreams(NULL), fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1), fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL), fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) { // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive. // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block, // even if the socket was previously reported (e.g., by "select()") as having data available. // (This can supposedly happen if the UDP checksum fails, for example.) makeSocketNonBlocking(fGS->socketNum()); increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);}RTPInterface::~RTPInterface() { delete fTCPStreams;}void RTPInterface::setStreamSocket(int sockNum, unsigned char streamChannelId) { fGS->removeAllDestinations(); addStreamSocket(sockNum, streamChannelId);}void RTPInterface::addStreamSocket(int sockNum, unsigned char streamChannelId) { if (sockNum < 0) return; for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { if (streams->fStreamSocketNum == sockNum && streams->fStreamChannelId == streamChannelId) { return; // we already have it } } fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);}static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) { SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False); if (socketDescriptor != NULL) { socketDescriptor->deregisterRTPInterface(streamChannelId); // Note: This may delete "socketDescriptor", // if no more interfaces are using this socket }}void RTPInterface::removeStreamSocket(int sockNum, unsigned char streamChannelId) { for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL; streamsPtr = &((*streamsPtr)->fNext)) { if ((*streamsPtr)->fStreamSocketNum == sockNum && (*streamsPtr)->fStreamChannelId == streamChannelId) { deregisterSocket(envir(), sockNum, streamChannelId); // Then remove the record pointed to by *streamsPtr : tcpStreamRecord* next = (*streamsPtr)->fNext; (*streamsPtr)->fNext = NULL; delete (*streamsPtr); *streamsPtr = next; return; } }}void RTPInterface::setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) { for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { // Get (or create, if necessary) a socket descriptor for "streams->fStreamSocketNum": SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum); socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData); }}void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) { // Normal case: Send as a UDP packet: fGS->output(envir(), fGS->ttl(), packet, packetSize); // Also, send over each of our TCP sockets: for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { sendRTPOverTCP(packet, packetSize, streams->fStreamSocketNum, streams->fStreamChannelId); }}void RTPInterface::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) { // Normal case: Arrange to read UDP packets: envir().taskScheduler(). turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner); // Also, receive RTP over TCP, on each of our TCP connections: fReadHandlerProc = handlerProc; for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { // Get a socket descriptor for "streams->fStreamSocketNum": SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum); // Tell it about our subChannel: socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this); }}Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize, unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) { packetReadWasIncomplete = False; // by default Boolean readSuccess; if (fNextTCPReadStreamSocketNum < 0) { // Normal case: read from the (datagram) 'groupsock': readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress); } else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -