⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rtpsink.cpp

📁 rtsp协议的主要实现代码.对开发流媒体
💻 CPP
字号:
// RTPSink.cpp: implementation of the RTPSink class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "RTPSink.h"

#pragma comment(lib,"winmm.lib")

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
static unsigned const rtpHeaderSize = 12;
typedef void TaskFunc(void* clientData);

RTPSink::RTPSink(unsigned char rtpPayloadType,unsigned rtpTimestampFrequency,
				 unsigned numChannels):
	fRTPPayloadType(rtpPayloadType),fTimestampFrequency(rtpTimestampFrequency),
	fCurFragmentationOffset(0),fHaveComputedFirstTimestamp(False)
{
	fSeqNo = (unsigned short)CUtility::our_random();
	fSSRC = CUtility::our_random32();
	fTimestampBase = CUtility::our_random32();
	fCurrentTimestamp = fTimestampBase;	
	fRTPInterface = RTPInterface::createNew();
	fNumFramesUsedSoFar = 0;
	setPacketSizes(1000, 1448);
}

RTPSink::~RTPSink()
{
	timeKillEvent(m_timeID);
	delete fRTPInterface;
	delete fOutBuf;
}

void RTPSink::setPacketSizes(unsigned preferredPacketSize,
										unsigned maxPacketSize) {
	if (preferredPacketSize > maxPacketSize || preferredPacketSize == 0) return;
	// sanity check
	
	fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
}

Boolean RTPSink::startPlaying(FramedSource* fMediaSource)
{
	// Make sure we're not already being played:
	//if (fSource != NULL) {
	// 	CUtility::setResultMsg("This sink is already being played");
	//	return FALSE;
	//}
	// Make sure our source is compatible:
	fSource = (FramedSource*)fMediaSource;

	return continuePlaying();
}

Boolean RTPSink::continuePlaying() {
	// Send the first packet.
	// (This will also schedule any future sends.)
	buildAndSendPacket(True);
	return True;
}

void RTPSink::buildAndSendPacket(Boolean isFirstPacket)
{
	fIsFirstPacket = isFirstPacket;

	// Set up the RTP header:
	unsigned rtpHdr = 0x80000000; // RTP version 2
	rtpHdr |= (fRTPPayloadType<<16);
	rtpHdr |= fSeqNo; // sequence number
	fOutBuf->enqueueWord(rtpHdr);
	
	// Note where the RTP timestamp will go.
	// (We can't fill this in until we start packing payload frames.)
	fTimestampPosition = fOutBuf->curPacketSize();
	fOutBuf->skipBytes(4); // leave a hole for the timestamp
	
	fOutBuf->enqueueWord(fSSRC);
	
	// Allow for a special, payload-format-specific header following the
	// RTP header:
	fSpecialHeaderPosition = fOutBuf->curPacketSize();
	fSpecialHeaderSize = frameSpecificHeaderSize();
	fOutBuf->skipBytes(fSpecialHeaderSize);
	
	// Begin packing as many (complete) frames into the packet as we can:
	fTotalFrameSpecificHeaderSizes = 0;
	fNoFramesLeft = False;
	fNumFramesUsedSoFar = 0;
	packFrame();
}

void RTPSink::packFrame()
{
	if (fSource == NULL) return;
	
    fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
    fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
    fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
    fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
	
    fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(),afterGettingFrame, this);	
	//padding timestamp
	//sendto
}

void RTPSink
::afterGettingFrame(void* clientData, unsigned numBytesRead,
					unsigned numTruncatedBytes,
					struct timeval presentationTime,
					unsigned durationInMicroseconds) {
	RTPSink* sink = (RTPSink*)clientData;
	sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,
		presentationTime, durationInMicroseconds);
}

void RTPSink
::doSpecialFrameHandling(unsigned /*fragmentationOffset*/,
						 unsigned char* /*frameStart*/,
						 unsigned /*numBytesInFrame*/,
						 struct timeval frameTimestamp,
						 unsigned /*numRemainingBytes*/) {
	// default implementation: If this is the first frame in the packet,
	// use its timestamp for the RTP timestamp:
	if (isFirstFrameInPacket()) {
		setTimestamp(frameTimestamp);
	}
}

void RTPSink::setTimestamp(struct timeval timestamp) {
	// First, convert the timestamp to a 32-bit RTP timestamp:
	fCurrentTimestamp = convertToRTPTimestamp(timestamp);
	
	// Then, insert it into the RTP packet:
	fOutBuf->insertWord(fCurrentTimestamp, fTimestampPosition);
}

u_int32_t RTPSink::convertToRTPTimestamp(struct timeval tv) {
	u_int32_t rtpTimestampIncrement = timevalToTimestamp(tv);
	
	if (!fHaveComputedFirstTimestamp) {
		// Make the first timestamp the same as the current "fTimestampBase", so that
		// timestamps begin with the value we promised when this "RTPSink" was created:
		fTimestampBase -= rtpTimestampIncrement;
		fHaveComputedFirstTimestamp = True;
	}
	
	u_int32_t const rtpTimestamp = fTimestampBase + rtpTimestampIncrement;
#ifdef DEBUG_TIMESTAMPS
	fprintf(stderr, "fTimestampBase: 0x%08x, tv: %lu.%06ld\n\t=> RTP timestamp: 0x%08x\n",
		fTimestampBase, tv.tv_sec, tv.tv_usec, rtpTimestamp);
	fflush(stderr);
#endif
	
	return rtpTimestamp;
}

u_int32_t RTPSink::timevalToTimestamp(struct timeval tv) const {
	u_int32_t timestamp = (fTimestampFrequency*tv.tv_sec);
	timestamp += (u_int32_t)((2.0*fTimestampFrequency*tv.tv_usec + 1000000.0)/2000000);
	// note: rounding
	return timestamp;
}


void RTPSink
::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes,
					 struct timeval presentationTime,
					 unsigned durationInMicroseconds) 
{
	if (fIsFirstPacket) {
		// Record the fact that we're starting to play now:
		gettimeofday(&fNextSendTime, NULL);
	}

	unsigned curFragmentationOffset = fCurFragmentationOffset;
	unsigned numFrameBytesToUse = frameSize;
	unsigned overflowBytes = 0;

    // Use this frame in our outgoing packet:
	
    // Here's where any payload format specific processing gets done:
    doSpecialFrameHandling(curFragmentationOffset, fOutBuf->curPtr(),
		numFrameBytesToUse, presentationTime,
		overflowBytes);
	
    fOutBuf->increment(numFrameBytesToUse);
    ++fNumFramesUsedSoFar;
	
	
    // Send our packet now if (i) it's already at our preferred size, or
    // (ii) (heuristic) another frame of the same size as the one we just
    //      read would overflow the packet, or
    // (iii) it contains the last fragment of a fragmented frame, and we
    //      don't allow anything else to follow this or
    // (iv) one frame per packet is allowed:
    if (TRUE) 
	{
		// The packet is ready to be sent now
		sendPacketIfNecessary();
	}
}

// The following is called after each delay between packet sends:
void CALLBACK sendNext(UINT uID, UINT uMsg, DWORD dwUser, DWORD dw1, DWORD dw2)
{
	RTPSink* sink = (RTPSink*)dwUser;
	sink->buildAndSendPacket(False);
}

void RTPSink::sendPacketIfNecessary() {
	if (fNumFramesUsedSoFar > 0) {
		// Send the packet:
#ifdef TEST_LOSS
		//if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
		fRTPInterface->sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize());
		++fPacketCount;
		fTotalOctetCount += fOutBuf->curPacketSize();
		fOctetCount += fOutBuf->curPacketSize()
			- rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
		
		++fSeqNo; // for next time
	}

	// Normal case: Reset the packet start pointer back to the start:
    fOutBuf->resetPacketStart();
	
	fOutBuf->resetOffset();

	//do loop send next frame
	m_timeID = timeSetEvent(1,0,sendNext,(DWORD)this,TIME_ONESHOT);
}

unsigned RTPSink::frameSpecificHeaderSize() const {
	// default implementation: Assume no frame-specific header:
	return 0;
}

////////// OutPacketBuffer //////////

unsigned OutPacketBuffer::maxSize = 60000; // by default

OutPacketBuffer::OutPacketBuffer(unsigned preferredPacketSize,
				 unsigned maxPacketSize)
  : fPreferred(preferredPacketSize), fMax(maxPacketSize),
    fOverflowDataSize(0) {
  unsigned maxNumPackets = (maxSize + (maxPacketSize-1))/maxPacketSize;
  fLimit = maxNumPackets*maxPacketSize;
  fBuf = new unsigned char[fLimit];
  resetPacketStart();
  resetOffset();
  resetOverflowData();
}

OutPacketBuffer::~OutPacketBuffer() {
  delete[] fBuf;
}

void OutPacketBuffer::enqueue(unsigned char const* from, unsigned numBytes) {
  if (numBytes > totalBytesAvailable()) {
#ifdef DEBUG
    fprintf(stderr, "OutPacketBuffer::enqueue() warning: %d > %d\n", numBytes, totalBytesAvailable());
#endif
    numBytes = totalBytesAvailable();
  }

  if (curPtr() != from) memmove(curPtr(), from, numBytes);
  increment(numBytes);
}

void OutPacketBuffer::enqueueWord(unsigned word) {
  unsigned nWord = htonl(word);
  enqueue((unsigned char*)&nWord, 4);
}

void OutPacketBuffer::insert(unsigned char const* from, unsigned numBytes,
			     unsigned toPosition) {
  unsigned realToPosition = fPacketStart + toPosition;
  if (realToPosition + numBytes > fLimit) {
    if (realToPosition > fLimit) return; // we can't do this
    numBytes = fLimit - realToPosition;
  }

  memmove(&fBuf[realToPosition], from, numBytes);
  if (toPosition + numBytes > fCurOffset) {
    fCurOffset = toPosition + numBytes;
  }
}

void OutPacketBuffer::insertWord(unsigned word, unsigned toPosition) {
  unsigned nWord = htonl(word);
  insert((unsigned char*)&nWord, 4, toPosition);
}

void OutPacketBuffer::extract(unsigned char* to, unsigned numBytes,
			      unsigned fromPosition) {
  unsigned realFromPosition = fPacketStart + fromPosition;
  if (realFromPosition + numBytes > fLimit) { // sanity check
    if (realFromPosition > fLimit) return; // we can't do this
    numBytes = fLimit - realFromPosition;
  }

  memmove(to, &fBuf[realFromPosition], numBytes);
}

unsigned OutPacketBuffer::extractWord(unsigned fromPosition) {
  unsigned nWord;
  extract((unsigned char*)&nWord, 4, fromPosition);
  return ntohl(nWord);
}

void OutPacketBuffer::skipBytes(unsigned numBytes) {
  if (numBytes > totalBytesAvailable()) {
    numBytes = totalBytesAvailable();
  }

  increment(numBytes);
}

void OutPacketBuffer
::setOverflowData(unsigned overflowDataOffset,
		  unsigned overflowDataSize,
		  struct timeval const& presentationTime,
		  unsigned durationInMicroseconds) {
  fOverflowDataOffset = overflowDataOffset;
  fOverflowDataSize = overflowDataSize;
  fOverflowPresentationTime = presentationTime;
  fOverflowDurationInMicroseconds = durationInMicroseconds;
}

void OutPacketBuffer::useOverflowData() {
  enqueue(&fBuf[fPacketStart + fOverflowDataOffset], fOverflowDataSize);
  fCurOffset -= fOverflowDataSize; // undoes increment performed by "enqueue"
  resetOverflowData();
}

void OutPacketBuffer::adjustPacketStart(unsigned numBytes) { 
  fPacketStart += numBytes;
  if (fOverflowDataOffset >= numBytes) {
    fOverflowDataOffset -= numBytes;
  } else {
    fOverflowDataOffset = 0;
    fOverflowDataSize = 0; // an error otherwise
  }
}

void OutPacketBuffer::resetPacketStart() {
  if (fOverflowDataSize > 0) {
    fOverflowDataOffset += fPacketStart;
  }
  fPacketStart = 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -