⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rtpinterface.cpp

📁 流媒体传输协议的实现代码,非常有用.可以支持rtsp mms等流媒体传输协议
💻 CPP
字号:
/**********This library is free software; you can redistribute it and/or modify it underthe terms of the GNU Lesser General Public License as published by theFree Software Foundation; either version 2.1 of the License, or (at youroption) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)This library is distributed in the hope that it will be useful, but WITHOUTANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESSFOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License formore details.You should have received a copy of the GNU Lesser General Public Licensealong with this library; if not, write to the Free Software Foundation, Inc.,59 Temple Place, Suite 330, Boston, MA  02111-1307  USA**********/// "liveMedia"// Copyright (c) 1996-2004 Live Networks, Inc.  All rights reserved.// An abstraction of a network interface used for RTP (or RTCP).// (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to// be implemented transparently.)// Implementation#include "RTPInterface.hh"#include <GroupsockHelper.hh>#include <stdio.h>////////// Helper Functions - Definition //////////// Helper routines and data structures, used to implement// sending/receiving RTP/RTCP over a TCP socket:static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,			   int socketNum, unsigned char streamChannelId);// Reading RTP-over-TCP is implemented using two levels of hash tables.// The top-level hash table maps TCP socket numbers to a// "SocketDescriptor" that contains a hash table for each of the// sub-channels that are reading from this socket.static HashTable* socketHashTable(UsageEnvironment& env) {  _Tables* ourTables = _Tables::getOurTables(env);  if (ourTables->socketTable == NULL) {    // Create a new socket number -> SocketDescriptor mapping table:    ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);  }  return (HashTable*)(ourTables->socketTable);}class SocketDescriptor {public:  SocketDescriptor(UsageEnvironment& env, int socketNum);  virtual ~SocketDescriptor();  void registerRTPInterface(unsigned char streamChannelId,			    RTPInterface* rtpInterface);  RTPInterface* lookupRTPInterface(unsigned char streamChannelId);  void deregisterRTPInterface(unsigned char streamChannelId);private:  static void tcpReadHandler(SocketDescriptor*, int mask);private:  UsageEnvironment& fEnv;  int fOurSocketNum;  HashTable* fSubChannelHashTable;};static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env,						int sockNum) {  char const* key = (char const*)(long)sockNum;  return (SocketDescriptor*)(socketHashTable(env)->Lookup(key));}static void removeSocketDescription(UsageEnvironment& env, int sockNum) {  char const* key = (char const*)(long)sockNum;  HashTable* table = socketHashTable(env);  table->Remove(key);  if (table->IsEmpty()) {    // We can also delete the table (to reclaim space):    _Tables* ourTables = _Tables::getOurTables(env);    delete table;    ourTables->socketTable = NULL;    ourTables->reclaimIfPossible();  }}////////// RTPInterface - Implementation //////////RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)  : fOwner(owner), fGS(gs),    fTCPStreams(NULL),    fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),    fReadHandlerProc(NULL),    fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {}RTPInterface::~RTPInterface() {  delete fTCPStreams;}Boolean RTPOverTCP_OK = True; // HACK: For detecting TCP socket failure externally #####void RTPInterface::setStreamSocket(int sockNum,				   unsigned char streamChannelId) {  fGS->removeAllDestinations();  addStreamSocket(sockNum, streamChannelId);}void RTPInterface::addStreamSocket(int sockNum,				   unsigned char streamChannelId) {  if (sockNum < 0) return;  else RTPOverTCP_OK = True; //##### HACK  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;       streams = streams->fNext) {    if (streams->fStreamSocketNum == sockNum	&& streams->fStreamChannelId == streamChannelId) {      return; // we already have it    }  }  fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);}void RTPInterface::removeStreamSocket(int sockNum,				      unsigned char streamChannelId) {  for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;       streamsPtr = &((*streamsPtr)->fNext)) {    if ((*streamsPtr)->fStreamSocketNum == sockNum	&& (*streamsPtr)->fStreamChannelId == streamChannelId) {      // Remove the record pointed to by *streamsPtr :      tcpStreamRecord* next = (*streamsPtr)->fNext;      (*streamsPtr)->fNext = NULL;      delete (*streamsPtr);      *streamsPtr = next;      return;    }  }}void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {  // Normal case: Send as a UDP packet:  fGS->output(envir(), fGS->ttl(), packet, packetSize);  // Also, send over each of our TCP socket:  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;       streams = streams->fNext) {    sendRTPOverTCP(packet, packetSize,		   streams->fStreamSocketNum, streams->fStreamChannelId);  }}void RTPInterface::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {  // Normal case: Arrange to read UDP packets:  envir().taskScheduler().    turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);  // Also, receive RTP over TCP, on each of our TCP connections:  fReadHandlerProc = handlerProc;  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;       streams = streams->fNext) {    // Get a socket descriptor for "streams->fStreamSocketNum":    SocketDescriptor* socketDescriptor      = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);    if (socketDescriptor == NULL) {      socketDescriptor	= new SocketDescriptor(envir(), streams->fStreamSocketNum);      socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum),				    socketDescriptor);    }    // Tell it about our subChannel:    socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);  }}Boolean RTPInterface::handleRead(unsigned char* buffer,				 unsigned bufferMaxSize,				 unsigned& bytesRead,				 struct sockaddr_in& fromAddress) {  Boolean readSuccess;  if (fNextTCPReadStreamSocketNum < 0) {    // Normal case: read from the (datagram) 'groupsock':    readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);  } else {    // Read from the TCP connection:    bytesRead = 0;    unsigned totBytesToRead = fNextTCPReadSize;    if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;     unsigned curBytesToRead = totBytesToRead;    unsigned curBytesRead;    while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,				      &buffer[bytesRead], curBytesToRead,				      fromAddress)) > 0) {      bytesRead += curBytesRead;      if (bytesRead >= totBytesToRead) break;      curBytesToRead -= curBytesRead;    }    if (curBytesRead <= 0) {      bytesRead = 0;      readSuccess = False;      RTPOverTCP_OK = False; // HACK #####    } else {      readSuccess = True;    }    fNextTCPReadStreamSocketNum = -1; // default, for next time  }  if (readSuccess && fAuxReadHandlerFunc != NULL) {    // Also pass the newly-read packet data to our auxilliary handler:    (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);  }  return readSuccess;}void RTPInterface::stopNetworkReading() {  // Normal case  envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());  // Also turn off read handling on each of our TCP connections:  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;       streams = streams->fNext) {    SocketDescriptor* socketDescriptor      = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);    if (socketDescriptor != NULL) {      socketDescriptor->deregisterRTPInterface(streams->fStreamChannelId);        // Note: This may delete "socketDescriptor",        // if no more interfaces are using this socket    }  }}////////// Helper Functions - Implementation /////////void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,                    int socketNum, unsigned char streamChannelId) {#ifdef DEBUG  fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",	  packetSize, streamChannelId, socketNum); fflush(stderr);#endif  // Send RTP over TCP, using the encoding defined in  // RFC 2326, section 10.12:  do {    char const dollar = '$';    if (send(socketNum, &dollar, 1, 0) != 1) break;    if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;    char netPacketSize[2];    netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);    netPacketSize[1] = (char) (packetSize&0xFF);    if (send(socketNum, netPacketSize, 2, 0) != 2) break;    if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;#ifdef DEBUG    fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);#endif    return;  } while (0);  RTPOverTCP_OK = False; // HACK ######ifdef DEBUG  fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);#endif}SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)  : fEnv(env), fOurSocketNum(socketNum),    fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)) {}SocketDescriptor::~SocketDescriptor() {  delete fSubChannelHashTable;}void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,					    RTPInterface* rtpInterface) {  Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();  fSubChannelHashTable->Add((char const*)(long)streamChannelId,			    rtpInterface);  if (isFirstRegistration) {    // Arrange to handle reads on this TCP socket:    TaskScheduler::BackgroundHandlerProc* handler      = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;    fEnv.taskScheduler().      turnOnBackgroundReadHandling(fOurSocketNum, handler, this);  }}RTPInterface* SocketDescriptor::lookupRTPInterface(unsigned char streamChannelId) {  char const* lookupArg = (char const*)(long)streamChannelId;  return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));}void SocketDescriptor::deregisterRTPInterface(unsigned char streamChannelId) {  fSubChannelHashTable->Remove((char const*)(long)streamChannelId);  if (fSubChannelHashTable->IsEmpty()) {    // No more interfaces are using us, so it's curtains for us now    fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);    removeSocketDescription(fEnv, fOurSocketNum);    delete this;  }}void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor,				      int mask) {  do {    UsageEnvironment& env = socketDescriptor->fEnv; // abbrev    int socketNum = socketDescriptor->fOurSocketNum;    // Begin by reading and discarding any characters that aren't '$'.    // Any such characters are probably regular RTSP responses or    // commands from the server.  At present, we can't do anything with    // these, because we have taken complete control of reading this socket.    // (Later, fix) #####    unsigned char c;    struct sockaddr_in fromAddress;    do {      if (readSocket(env, socketNum, &c, 1, fromAddress) != 1) { // error reading TCP socket	env.taskScheduler().turnOffBackgroundReadHandling(socketNum); // stops further calls to us	return;      }    } while (c != '$');    // The next byte is the stream channel id:    unsigned char streamChannelId;    if (readSocket(env, socketNum, &streamChannelId, 1, fromAddress)	!= 1) break;    RTPInterface* rtpInterface      = socketDescriptor->lookupRTPInterface(streamChannelId);    if (rtpInterface == NULL) break; // we're not interested in this channel    // The next two bytes are the RTP or RTCP packet size (in network order)    unsigned short size;    if (readSocketExact(env, socketNum, (unsigned char*)&size, 2,			fromAddress) != 2) break;    rtpInterface->fNextTCPReadSize = ntohs(size);    rtpInterface->fNextTCPReadStreamSocketNum = socketNum;#ifdef DEBUG    fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, streamChannelId);#endif    // Now that we have the data set up, call this subchannel's    // read handler:    if (rtpInterface->fReadHandlerProc != NULL) {      rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);    }  } while (0);}////////// tcpStreamRecord implementation //////////tcpStreamRecord::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,		  tcpStreamRecord* next)  : fNext(next),    fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {}tcpStreamRecord::~tcpStreamRecord() {  delete fNext;}                                                                                

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -