📄 rtptran.cpp
字号:
m_wrapSequenceNumber = DEFAULT_WRAP_SEQ_NO;}RTPTCPTransport::~RTPTCPTransport(){ HX_RELEASE(m_pTCPSocket);}voidRTPTCPTransport::Done(){ RTPBaseTransport::Done();}RTSPTransportTypeEnumRTPTCPTransport::tag(){ return RTSP_TR_RTP_TCP;}HX_RESULTRTPTCPTransport::init(IUnknown* pContext, IHXTCPSocket* pSocket, IHXRTSPTransportResponse* pResp){ m_pTCPSocket = pSocket; m_pTCPSocket->AddRef(); m_pResp = pResp; m_pResp->AddRef(); /* Set DiffServ Code Point */ IHXSetSocketOption* pOpt = NULL; if (SUCCEEDED(m_pTCPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt))) { IHXQoSDiffServConfigurator* pCfg = NULL; if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg))) { pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_MEDIA); pCfg->Release(); pCfg = NULL; } pOpt->Release(); pOpt = NULL; } HX_RESULT hresult = Init(pContext); if (HXR_OK != hresult) { return hresult; } RTPBaseTransport::init(); return HXR_OK;}HX_RESULTRTPTCPTransport::sendPacket(BasePacket* pPacket){ HX_ASSERT(m_bActive); HX_RESULT theErr; if (m_ulPayloadWirePacket!=0) { IHXBuffer* pSendBuf = NULL; theErr = reflectPacket(pPacket, pSendBuf); if (HXR_OK == theErr) { theErr = writePacket(pSendBuf); pSendBuf->Release(); } else if (HXR_IGNORE == theErr) { return HXR_OK; } return theErr; } IHXBuffer* pPacketBuf = NULL; theErr = makePacket(pPacket, pPacketBuf); if (HXR_OK == theErr) { theErr = writePacket(pPacketBuf); /* send SR if necessary */ if (HXR_OK == theErr && m_pRTCPTran->m_bSendReport && m_pRTCPTran->m_bSendRTCP) { m_pRTCPTran->sendSenderReport(); m_pRTCPTran->m_bSendReport = FALSE; m_pRTCPTran->scheduleNextReport(); } } HX_RELEASE(pPacketBuf); return theErr;}HX_RESULTRTPTCPTransport::writePacket(IHXBuffer* pBuf){ if (!m_pTCPSocket) return HXR_FAIL; // need to put $\000[datalen] in front of packet data UINT32 dataLen = pBuf->GetSize(); if(dataLen > 0xffff) { return HXR_FAIL; } //XXXTDM: always true, m_tcpInteleave is signed (why?) //HX_ASSERT(0xFF != m_tcpInterleave); IHXBuffer* pHeader = NULL; m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**)&pHeader); BYTE* pHeaderData; if(!pHeader) { return HXR_OUTOFMEMORY; } pHeader->SetSize(4); pHeaderData = pHeader->GetBuffer(); pHeaderData[0] = '$'; pHeaderData[1] = m_tcpInterleave; putshort(&pHeaderData[2], (UINT16)dataLen); HX_RESULT rc; rc = m_pTCPSocket->Write(pHeader); if (SUCCEEDED(rc)) { rc = m_pTCPSocket->Write(pBuf); } if(rc) { m_pResp->OnProtocolError(HXR_NET_SOCKET_INVALID); } pHeader->Release(); return rc;}/******************************************************************************* RTCP RTCP RTCP RTCP RTCP******************************************************************************/RTCPBaseTransport::RTCPBaseTransport(BOOL bIsSource): RTSPTransport(bIsSource), m_lRefCount(0), m_bCallbackPending(FALSE), m_pReportCallback(0), m_reportTimeoutID(0), m_bSchedulerStarted(FALSE), m_bSendRTCP(TRUE), m_bSendBye(FALSE), m_bSendReport(FALSE), m_pcCNAME(NULL), m_pReportHandler(NULL), m_pDataTransport(NULL), m_pTSConverter(NULL), m_streamNumber(0xffff), m_pSignalBus(NULL), m_pQoSSignal_RR(NULL), m_pQoSSignal_APP(NULL), m_pSessionId(NULL), m_bSSRCDetermined(FALSE), m_ulSSRCDetermined(0){}RTCPBaseTransport::~RTCPBaseTransport(){ HX_DELETE(m_pTSConverter);}STDMETHODIMPRTCPBaseTransport::QueryInterface(REFIID riid, void** ppvObj){ if (IsEqualIID(riid, IID_IUnknown)) { AddRef(); *ppvObj = this; return HXR_OK; } else if (IsEqualIID(riid, IID_IHXQoSSignalSourceResponse)) { AddRef(); *ppvObj = (IHXQoSSignalSourceResponse*)this; return HXR_OK; } *ppvObj = NULL; return HXR_NOINTERFACE;}STDMETHODIMP_(UINT32)RTCPBaseTransport::AddRef(){ return InterlockedIncrement(&m_lRefCount);}STDMETHODIMP_(UINT32)RTCPBaseTransport::Release(){ if(InterlockedDecrement(&m_lRefCount) > 0) { return m_lRefCount; } delete this; return 0;}voidRTCPBaseTransport::Done(){ stopScheduler(); HX_RELEASE(m_pPacketFilter); HX_VECTOR_DELETE(m_pcCNAME); HX_DELETE(m_pReportHandler); HX_RELEASE(m_pQoSSignal_RR); HX_RELEASE(m_pQoSSignal_APP); HX_RELEASE(m_pSignalBus); HX_RELEASE(m_pSessionId);}HX_RESULTRTCPBaseTransport::init(){ HX_ASSERT(!m_pReportCallback); HX_ASSERT(!m_pcCNAME); m_pReportCallback = new ReportCallback(this); if(!m_pReportCallback) { return HXR_OUTOFMEMORY; } m_pReportCallback->AddRef(); char cname[16] = {0}; /* Flawfinder: ignore */ itoa(random32(HX_GET_TICKCOUNT()), cname, 10); m_pcCNAME = (BYTE*)new_string(cname); HX_ASSERT(m_pcCNAME); return HXR_OK;}void RTCPBaseTransport::addStreamInfo (RTSPStreamInfo* pStreamInfo, UINT32 ulBufferDepth){ UINT32 ulInvalidRate = (UINT32)-1; UINT32 ulAvgBitRate = pStreamInfo->m_ulAvgBitRate; UINT32 ulRRBitRate = pStreamInfo->m_ulRtpRRBitRate; UINT32 ulRSBitRate = pStreamInfo->m_ulRtpRSBitRate; BOOL bUseRFC1889MinTime = FALSE; if (!ulAvgBitRate) { // We don't know the average bitrate. // Make something up ulAvgBitRate = 20000; } else { UINT32 ulRTCPBw = ulAvgBitRate / 20; // 5% of AvgBitRate if ((ulRRBitRate == ulInvalidRate) && (ulRSBitRate != ulInvalidRate) && (ulRTCPBw > ulRSBitRate)) { ulRRBitRate = ulRTCPBw - ulRSBitRate; } else if ((ulRRBitRate != ulInvalidRate) && (ulRSBitRate == ulInvalidRate) && (ulRTCPBw > ulRRBitRate)) { ulRSBitRate = ulRTCPBw - ulRRBitRate; } } if ((ulRRBitRate == ulInvalidRate) || (ulRSBitRate == ulInvalidRate)) { // If one of the bitrates is still // invalid at this point we just // default to the RFC 1889 behavior. // RS = 1.25% of the average bitrate // RR = 3.75% of the average bitrate bUseRFC1889MinTime = TRUE; m_bSendRTCP = TRUE; ulRSBitRate = ulAvgBitRate / 80; // 1.25% ulRRBitRate = ((ulAvgBitRate / 80) * 3 + ((ulAvgBitRate % 80) * 3) / 80); // 3.75% } else if (ulRRBitRate == 0) { // We have been told not // to send RTCP reports m_bSendRTCP = FALSE; } if (m_pReportHandler) { // Get the minimum RTCP report interval UINT32 ulMinIntervalMs = (bUseRFC1889MinTime) ? 5000 : 1; // Pass the report interval parameters to // the report handler m_pReportHandler->SetRTCPIntervalParams(ulRSBitRate, ulRRBitRate, ulMinIntervalMs); }}voidRTCPBaseTransport::setSSRC(UINT32 ulSSRC){ m_bSSRCDetermined = TRUE; m_ulSSRCDetermined = ulSSRC;}voidRTCPBaseTransport::setSessionID(const char* pSessionID){ /* cache the session id for use in retrieving signal bus*/ if(pSessionID && (SUCCEEDED(m_pCommonClassFactory->CreateInstance(CLSID_IHXBuffer, (void**)&m_pSessionId)))) { m_pSessionId->Set((UCHAR*)pSessionID, strlen(pSessionID)+1); IHXQoSSignalSource* pSignalSrc = NULL; if (m_pSessionId && SUCCEEDED(m_pContext->QueryInterface(IID_IHXQoSSignalSource, (void**) &pSignalSrc))) { pSignalSrc->GetSignalBus(m_pSessionId, (IHXQoSSignalSourceResponse*)this); HX_RELEASE(pSignalSrc); } else { m_pSignalBus = NULL; } }}STDMETHODIMPRTCPBaseTransport::SignalBusReady (HX_RESULT hResult, IHXQoSSignalBus* pBus, IHXBuffer* pSessionId){ if (FAILED(hResult)) { HX_ASSERT(0); return HXR_OK; } m_pSignalBus = pBus; m_pSignalBus->AddRef(); if (m_pDataTransport) { if (FAILED(m_pSignalBus->QueryInterface(IID_IHXQoSTransportAdaptationInfo, (void**)&m_pDataTransport->m_pQoSInfo))) { m_pDataTransport->m_pQoSInfo = NULL; } } else { HX_ASSERT(0); } if (SUCCEEDED(m_pCommonClassFactory->CreateInstance(CLSID_IHXQoSSignal, (void**)&m_pQoSSignal_RR))) { m_pQoSSignal_RR->SetId(MAKE_HX_QOS_SIGNAL_ID(HX_QOS_SIGNAL_LAYER_FRAMING_TRANSPORT, HX_QOS_SIGNAL_RELEVANCE_METRIC, HX_QOS_SIGNAL_RTCP_RR)); } else { HX_ASSERT(0); m_pQoSSignal_RR = NULL; } if (SUCCEEDED(m_pCommonClassFactory->CreateInstance(CLSID_IHXQoSSignal, (void**)&m_pQoSSignal_APP))) { m_pQoSSignal_APP->SetId(MAKE_HX_QOS_SIGNAL_ID(HX_QOS_SIGNAL_LAYER_FRAMING_TRANSPORT, HX_QOS_SIGNAL_RELEVANCE_METRIC, HX_QOS_SIGNAL_COMMON_BUFSTATE)); } else { HX_ASSERT(0); m_pQoSSignal_APP = NULL; } return HXR_OK;}HX_RESULT RTCPBaseTransport::SetTSConverter(CHXTimestampConverter::ConversionFactors conversionFactors){ HX_DELETE(m_pTSConverter); m_pTSConverter = new CHXTimestampConverter(conversionFactors); return m_pTSConverter ? HXR_OK : HXR_OUTOFMEMORY;}HX_RESULTRTCPBaseTransport::startScheduler(){ if(!m_bSchedulerStarted && m_bSendRTCP) { HX_ASSERT(!m_bCallbackPending); m_bSchedulerStarted = TRUE; if (!m_bMulticast) { // we wanna send the report right away! m_bSendReport = TRUE; } else { if (!m_bCallbackPending) { scheduleNextReport(); } } } return HXR_OK;}HX_RESULTRTCPBaseTransport::stopScheduler(){ if(m_bCallbackPending) { HX_ASSERT(m_pScheduler); m_pScheduler->Remove(m_reportTimeoutID); m_bCallbackPending = FALSE; } HX_RELEASE(m_pReportCallback); return HXR_OK;}voidRTCPBaseTransport::scheduleNextReport(){ if (m_bSendRTCP) { HX_ASSERT(!m_bSendReport); HX_ASSERT(!m_bCallbackPending); HX_ASSERT(m_pReportCallback); HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime(); Timeval tvNow((INT32)rmatv.tv_sec, (INT32)rmatv.tv_usec); tvNow += Timeval(m_pReportHandler->GetRTCPInterval()); rmatv.tv_sec = tvNow.tv_sec; rmatv.tv_usec = tvNow.tv_usec; m_reportTimeoutID = m_pScheduler->AbsoluteEnter(m_pReportCallback, rmatv); m_bCallbackPending = TRUE; }}/** we don't have a table of sender or receivers because we don't yet * support multicast. i.e. only one sender, one receiver*/HX_RESULTRTCPBaseTransport::handlePacket(IHXBuffer* pBuffer){ // we need to deal with a compund packet RTCPUnPacker unpacker;//{FILE* f1 = ::fopen("c:\\temp\\all.txt", "a+"); ::fprintf(f1, "this: %p RTCPTransport::handlePacket(): ", this);::fclose(f1);} if (HXR_OK != unpacker.UnPack(pBuffer)) { // failed...don't do anything more...still ok to return HXR_OK; return HXR_OK; } /* update */ m_pReportHandler->UpdateAvgRTCPSize(pBuffer->GetSize()); HX_RESULT theErr = HXR_OK; RTCPPacket* pPkt = NULL; HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime(); UINT32 ulNow = rmatv.tv_sec * 1000 + rmatv.tv_usec / 1000; // for EOS support BOOL bBye = FALSE; APPItem* pAppPkt = NULL; while (HXR_OK == unpacker.Get(pPkt)) {//{FILE* f1 = ::fopen("c:\\temp\\all.txt", "a+"); ::fprintf(f1, "%u\n", pPkt->packet_type);::fclose(f1);} if (m_bIsSource || (m_bSSRCDetermined && m_ulSSRCDetermined == pPkt->sr_ssrc)) { // deal with it! switch(pPkt->packet_type) { case RTCP_SR: { DPRINTF(D_INFO, ("RTCP: SenderReport received\n")); m_pReportHandler->OnRTCPReceive(pPkt, ulNow); m_pDataTransport->handleRTCPSync(NTPTime(pPkt->ntp_sec, pPkt->ntp_frac), pPkt->rtp_ts); } break; case RTCP_RR: { DPRINTF(D_INFO, ("RTCP: ReceiverReport received\n")); m_pReportHandler->OnRTCPReceive(pPkt, ulNow); IHXBuffer* pTmp = NULL; if((m_pSignalBus) && SUCCEEDED(m_pCommonClassFactory-> CreateInstance(CLSID_IHXBuffer,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -