⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prioritizedrtpstreamselector.cpp

📁 流媒体传输协议的实现代码,非常有用.可以支持rtsp mms等流媒体传输协议
💻 CPP
📖 第 1 页 / 共 2 页
字号:
      fNumTruncatedBytes = fFrameSize - fMaxSize;      fFrameSize = fMaxSize;    }    memmove(fTo, buffer, fFrameSize);    delete[] buffer;    fWarehouse->fLastActionWasIncoming = False;    return True;  }  // No frame was available.  return False;}void PrioritizedRTPStreamSelector::completeDelivery(void* clientData) {  PrioritizedRTPStreamSelector* selector    = (PrioritizedRTPStreamSelector*)clientData;  // Call our own 'after getting' function.  Because we're not a 'leaf'  // source, we can call this directly, without risking infinite recursion.  FramedSource::afterGetting(selector);}//////// PrioritizedInputStreamDescriptor implementation ////////unsigned const PrioritizedInputStreamDescriptor::fBufferSize = 4000;PrioritizedInputStreamDescriptor::PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector*				   ourSelector,				   PrioritizedInputStreamDescriptor* next,				   unsigned priority,				   RTPSource* inputStream,				   RTCPInstance* inputStreamRTCP)    : fOurSelector(ourSelector), fNext(next), fPriority(priority),      fRTPStream(inputStream), fRTCPStream(inputStreamRTCP) {  fBuffer = new unsigned char[fBufferSize];  fBufferBytesUsed = 0;}PrioritizedInputStreamDescriptor::~PrioritizedInputStreamDescriptor() {  delete[] fBuffer;}static void afterGettingFrame(void* clientData, unsigned frameSize,			      unsigned /*numTruncatedBytes*/,			      struct timeval /*presentationTime*/,			      unsigned /*durationInMicroseconds*/) {  PrioritizedInputStreamDescriptor* inputStream    = (PrioritizedInputStreamDescriptor*)clientData;  inputStream->afterGettingFrame1(frameSize);}void PrioritizedInputStreamDescriptor::afterGettingFrame1(unsigned frameSize) {  unsigned short rtpSeqNo = rtpStream()->curPacketRTPSeqNum();  // Deliver this frame to our selector:  fOurSelector->handleNewIncomingFrame(fPriority, rtpSeqNo,				       fBuffer, frameSize);}static void onSourceClosure(void* clientData) {  PrioritizedInputStreamDescriptor* inputStream    = (PrioritizedInputStreamDescriptor*)clientData;  inputStream->onSourceClosure1();}void PrioritizedInputStreamDescriptor::onSourceClosure1() {  fOurSelector->removeInputRTPStream(fPriority);}////////// WarehousedPacketDescriptor //////////class WarehousedPacketDescriptor {public:  WarehousedPacketDescriptor() : buffer(NULL) {}  // Don't define a destructor; for some reason it causes a crash #####  unsigned priority;  unsigned frameSize;  unsigned char* buffer;};////////// PacketWarehouse implementation PacketWarehouse::PacketWarehouse(unsigned seqNumStagger)  : fLastActionWasIncoming(False),    fHaveReceivedFrames(False), fMinSeqNumStored(0), fMaxSeqNumStored(0),    fMinSpanForDelivery((unsigned)(1.5*seqNumStagger)),    fMaxSpanForDelivery(3*seqNumStagger),    fNumDescriptors(4*seqNumStagger),    fInterArrivalAveGap(0) {  fPacketDescriptors = new WarehousedPacketDescriptor[fNumDescriptors];  if (fPacketDescriptors == NULL) {#ifdef DEBUG    fprintf(stderr, "PacketWarehouse failed to allocate %d descriptors; seqNumStagger too large!\n", fNumDescriptors);#endif    exit(1);  }  // Initially, set "fLastArrivalTime" to the current time:  gettimeofday(&fLastArrivalTime, NULL);}PacketWarehouse::~PacketWarehouse() {  // Delete each descriptor's buffer (if any), then delete the descriptors:  for (unsigned i = 0; i < fNumDescriptors; ++i) {    delete[] fPacketDescriptors[i].buffer;  }  delete[] fPacketDescriptors;}Boolean PacketWarehouse::isFull() {  int currentSpan = fMaxSeqNumStored - fMinSeqNumStored;  if (currentSpan < 0) currentSpan += 65536;  return (unsigned)currentSpan >= fNumDescriptors;}void PacketWarehouse::addNewFrame(unsigned priority,				  unsigned short rtpSeqNo,				  unsigned char* buffer,				  unsigned frameSize) {  if (!fHaveReceivedFrames) {    // This is our first frame; set up initial parameters:    // (But note hack: We want the first frame to have priority 0, so that    //  receivers' decoders are happy, by seeing the 'best' data initially)    if (priority != 0) return;    fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo;    fHaveReceivedFrames = True;  } else {    // Update our record of the maximum sequence number stored    if (seqNumLT(fMaxSeqNumStored, rtpSeqNo)) {      fMaxSeqNumStored = rtpSeqNo;    } else if (seqNumLT(rtpSeqNo, fMinSeqNumStored)) {      return; // ignore this packet; it's too old for us    }  }  if (isFull()) {    // We've gotten way ahead of ourselves, probably due to a very large    // set of consecutive lost packets.  To recover, reset our min/max    // sequence numbers (effectively emptying the warehouse).  Hopefully,    // this should be an unusual occurrence:    fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo;  }  // Check whether a frame with this sequence number has already been seen  WarehousedPacketDescriptor& desc    = fPacketDescriptors[rtpSeqNo%fNumDescriptors];  if (desc.buffer != NULL) {    // We already have a frame.  If it's priority is higher than that of    // this new frame, then we continue to use it:    if (desc.priority < priority) return; // lower than existing priority    // Otherwise, use the new frame instead, so get rid of the existing one:    delete[] desc.buffer;  }  // Record this new frame:  desc.buffer = new unsigned char[frameSize];  if (desc.buffer == NULL) {#ifdef DEBUG    fprintf(stderr, "PacketWarehouse::addNewFrame failed to allocate %d-byte buffer!\n", frameSize);#endif    exit(1);  }  memmove(desc.buffer, buffer, frameSize);  desc.frameSize = frameSize;  desc.priority = priority;  struct timeval timeNow;  gettimeofday(&timeNow, NULL);  if (rtpSeqNo == (fLastRTPSeqNo+1)%65536) {    // We've received consecutive packets, so update the estimate of    // the average inter-arrival gap:    unsigned lastGap // in microseconds      = (timeNow.tv_sec - fLastArrivalTime.tv_sec)*1000000      + (timeNow.tv_usec - fLastArrivalTime.tv_usec);    fInterArrivalAveGap = (9*fInterArrivalAveGap + lastGap)/10; // weighted  }  fLastArrivalTime = timeNow;  fLastRTPSeqNo = rtpSeqNo;}unsigned char* PacketWarehouse::dequeueFrame(unsigned& resultFrameSize,					     unsigned& uSecondsToDefer) {  uSecondsToDefer = 0; // by default  // Don't return anything if we don't yet have enough packets to  // cover our desired sequence number span  int currentSpan = fMaxSeqNumStored - fMinSeqNumStored;  if (currentSpan < 0) currentSpan += 65536;  if (currentSpan < (int)fMinSpanForDelivery) {    return NULL; // we're not ready  }  // Now, if our stored packet range is less than the desired maximum,  // return a frame, but delay this unless no incoming frames have  // arrived since the last time a frame was delivered to our client.  // This causes the buffer to empty only if the flow of incoming  // frames stops.  if (currentSpan < (int)fMaxSpanForDelivery) {    if (fLastActionWasIncoming) {      uSecondsToDefer = (unsigned)(1.5*fInterArrivalAveGap);    }  }  // Now, return a frame:  unsigned char* resultBuffer = NULL;  do {    if (currentSpan < (int)fMinSpanForDelivery) break;    WarehousedPacketDescriptor& desc      = fPacketDescriptors[fMinSeqNumStored%fNumDescriptors];    resultBuffer = desc.buffer;    resultFrameSize = desc.frameSize;    desc.buffer = NULL;#ifdef DEBUG    if (resultBuffer == NULL) fprintf(stderr, "No packet for seq num %d - skipping\n", fMinSeqNumStored); //#####  else if (desc.priority == 2) fprintf(stderr, "Using priority %d frame for seq num %d\n", desc.priority, fMinSeqNumStored);//######endif    fMinSeqNumStored = (fMinSeqNumStored+1)%65536;    --currentSpan;  } while (resultBuffer == NULL); // skip over missing packets  return resultBuffer;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -