⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rtptran.cpp

📁 著名的 helix realplayer 基于手机 symbian 系统的 播放器全套源代码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
	    // Npt time can be computed (ulHXTime)
	    ULONG32 ulHXTime = pStreamData->m_pTSConverter->rtp2hxa(ulRTPTime);
	    
	    retVal = HXR_OK;
	    
	    if ((!m_pSyncServer) && m_pResp)
	    {
		m_pResp->QueryInterface(IID_IHXTransportSyncServer, 
					(void**) &m_pSyncServer);
	    }
	    
	    if (m_bNTPtoHXOffsetSet)
	    {
		// We can sync - NTP to NPT offset is known
		ULONG32 ulExpectedHXTime = ulNtpHX + m_lNTPtoHXOffset;
		LONG32 lSyncOffsetHX = ulExpectedHXTime - ulHXTime;
		LONG32 lSyncOffsetChange = lSyncOffsetHX - m_lSyncOffsetHX;
		
		if ((lSyncOffsetChange > ACCEPTABLE_SYNC_NOISE) ||
		    (lSyncOffsetChange < (-ACCEPTABLE_SYNC_NOISE)))
		{
		    if (m_bIsSyncMaster && m_pSyncServer)
		    {
#ifdef RTP_MESSAGE_DEBUG
			messageFormatDebugFileOut("RTCP-SYNC: Distribute Master Sync NPTTime=%u SyncOffset=%d", 
					          ulHXTime, -lSyncOffsetHX);
#endif	// RTP_MESSAGE_DEBUG
			m_pSyncServer->DistributeSync(ulHXTime, -lSyncOffsetHX);
		    }
		    else
		    {			
			m_lSyncOffsetHX = lSyncOffsetHX;
			if (lSyncOffsetHX >= 0)
			{
			    m_lSyncOffsetRTP = (LONG32) 
				(pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lSyncOffsetHX));
			}
			else
			{
			    m_lSyncOffsetRTP = (LONG32) 
				(-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lSyncOffsetHX)));
			}
#ifdef RTP_MESSAGE_DEBUG
			messageFormatDebugFileOut("RTCP-SYNC: Self-Sync SyncOffset=%d SyncOffsetRTP=%d", 
					          m_lSyncOffsetHX, m_lSyncOffsetRTP);
#endif	// RTP_MESSAGE_DEBUG
		    }
		}
	    }
	    else
	    {
		// This the first RTCP sync accross all streams, anchor sync
		if (m_pSyncServer)
		{
#ifdef RTP_MESSAGE_DEBUG
		    messageFormatDebugFileOut("RTCP-SYNC: Distribute NTP-NPT Mapping NTPTime=%u NPTTime=%u", 
					      ulNtpHX, ulHXTime);
#endif	// RTP_MESSAGE_DEBUG
		    m_pSyncServer->DistributeSyncAnchor(ulHXTime, ulNtpHX);
		}
	    }
	}
    }

    return retVal;
}

HX_RESULT
RTPBaseTransport::anchorSync(ULONG32 ulHXTime, ULONG32 ulNTPTime)
{
    HX_RESULT retVal = HXR_OK;

    m_lNTPtoHXOffset = ulHXTime - ulNTPTime;
    m_bNTPtoHXOffsetSet = TRUE;

#ifdef RTP_MESSAGE_DEBUG
    messageFormatDebugFileOut("RTCP-SYNC: Received NTP-NPT Mapping NTPTime=%u NPTTime=%u NTPtoNPTOffset=%d", 
			      ulNTPTime, ulHXTime, m_lNTPtoHXOffset);
#endif	// RTP_MESSAGE_DEBUG

    return retVal;
}

HX_RESULT 
RTPBaseTransport::handleMasterSync(ULONG32 ulHXTime, LONG32 lHXOffsetToMaster)
{
    HX_RESULT retVal = HXR_IGNORE;
    RTSPStreamData* pStreamData = 
	    m_pStreamHandler->getStreamData(m_streamNumber);

    if (pStreamData && (!m_bIsSyncMaster))
    {
	retVal = HXR_OK;
	
	m_lOffsetToMasterHX = lHXOffsetToMaster;
	if (lHXOffsetToMaster >= 0)
	{
	    m_lOffsetToMasterRTP = (LONG32) 
		(pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lHXOffsetToMaster));
	}
	else
	{
	    m_lOffsetToMasterRTP = (LONG32) 
		(-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lHXOffsetToMaster)));
	}

#ifdef RTP_MESSAGE_DEBUG
	messageFormatDebugFileOut("RTCP-SYNC: Master-Sync NPTTime=%u MasterSyncOffset=%d MasterSyncOffsetRTP=%d", 
				  ulHXTime, m_lOffsetToMasterHX, m_lOffsetToMasterRTP);
#endif	// RTP_MESSAGE_DEBUG
    }

    return retVal;
}

void
RTPBaseTransport::resetStartInfoWaitQueue(void)
{
    IHXBuffer* pStoredBuffer;

    while (!m_StartInfoWaitQueue.IsEmpty())
    {
	pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead();
	
	HX_RELEASE(pStoredBuffer);
    }
}

HX_RESULT
RTPBaseTransport::streamDone(UINT16 streamNumber)
{
    HX_ASSERT(m_streamNumber == streamNumber);
    HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber);
    HX_RESULT hresult = HXR_OK;

    HX_ASSERT(m_pRTCPTran);

    if (!m_bActive)
    {
	// this stream is not active, don't do anything.
    }
    else if (m_bIsSource)
    {
	hresult= m_pRTCPTran->streamDone(streamNumber);
    }
    else
    {
    	// send BYE pkt
    	m_pRTCPTran->streamDone(streamNumber);
	hresult = m_pResp->OnStreamDone(HXR_OK, streamNumber);
    }

    return hresult;
}

STDMETHODIMP
RTPBaseTransport::InitBw(IHXBandwidthManagerInput* pBwMgr)
{
    HX_RELEASE(m_pBwMgrInput);

    m_pBwMgrInput = pBwMgr;
    pBwMgr->AddRef();

    return HXR_OK;
}

STDMETHODIMP
RTPBaseTransport::SetTransmitRate(UINT32 ulBitRate)
{
    return HXR_OK;
}

/*
 * XXXMC
 * Special-case handling for PV clients
 */
void
RTPBaseTransport::setPVEmulationMode(BOOL bPVSessionFlag)
{
    m_bEmulatePVSession = bPVSessionFlag;
}

void
RTPBaseTransport::setRTCPTransport(RTCPBaseTransport* pRTCPTran)
{
    HX_ASSERT(pRTCPTran);
    HX_ASSERT(!m_pRTCPTran);
    m_pRTCPTran = pRTCPTran;
    m_pRTCPTran->AddRef();

    // pointing to the same instatnce
    HX_ASSERT(m_pReportHandler);
    m_pRTCPTran->m_pReportHandler = m_pReportHandler;
}

void 
RTPBaseTransport::MBitRTPPktInfo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo)
{
    BOOL b = FALSE;
    IHXRTPPacketInfo* pRTPPacketInfo = NULL;	
    if (pPkt->QueryInterface(IID_IHXRTPPacketInfo, (void**) &pRTPPacketInfo) == HXR_OK)
    {
	if (pRTPPacketInfo->GetMarkerBit(b) == HXR_OK && b)
	{
	    bMBit = TRUE;
	}
	else
	{
	    bMBit = FALSE;
	}
	pRTPPacketInfo->Release();
    }
    else
    {
	bMBit = FALSE;
    }	
}

void 
RTPBaseTransport::MBitASMRuleNo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo)
{
    bMBit = m_bHasMarkerRule && ((unRuleNo & 0x1) == m_markerRuleNumber);
}

#ifdef RTP_MESSAGE_DEBUG
void
RTPBaseTransport::messageFormatDebugFileOut(const char* fmt, ...)
{
    if(m_bMessageDebug)
    {
	va_list args;
	char buf[4096]; /* Flawfinder: ignore */

	SafeSprintf(buf, 4096, "%s.%d", (const char*) m_messageDebugFileName, 
			      m_streamNumber);

	va_start(args, fmt);
	
	FILE* fp = fopen(buf, "a");
	if (fp)
	{
	    vsprintf(buf, fmt, args);
	    fprintf(fp, "%s\n", buf);
	    fclose(fp);
	}

	va_end(args);
    }
}
#endif	// RTP_MESSAGE_DEBUG

/*
 *   RTP UDP
 */
RTPUDPTransport::RTPUDPTransport(BOOL bIsSource)
    : RTPBaseTransport(bIsSource)
    , m_pUDPSocket(NULL)
    , m_foreignAddr(0)
    , m_foreignPort(0)
    , m_keepAliveSeq((UINT16)(random32(0) & 0xffff))
    , m_ulCurrentMulticastAddress(0)
    , m_ulCurrentMulticastPort(0)
    , m_pMCastUDPSocket(NULL)
{
}

RTPUDPTransport::~RTPUDPTransport()
{
    Done();
}

RTSPTransportTypeEnum
RTPUDPTransport::tag()
{
    return RTSP_TR_RTP_UDP;
}

void
RTPUDPTransport::Done()
{
    m_keepAlive.reset();

    if (m_pMCastUDPSocket)
    {
        m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
    }
    HX_RELEASE(m_pMCastUDPSocket);
    HX_RELEASE(m_pUDPSocket);
    RTPBaseTransport::Done();
}

HX_RESULT
RTPUDPTransport::init(IUnknown* pContext,
		       IHXUDPSocket* pSocket,
		       IHXRTSPTransportResponse* pResp)
{
    m_pResp = pResp;
    m_pResp->AddRef();
    
    m_pUDPSocket = pSocket;
    m_pUDPSocket->AddRef();

    /* Set DiffServ Code Point */
    IHXSetSocketOption* pOpt = NULL;
    if (SUCCEEDED(m_pUDPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt)))
    {
	IHXQoSDiffServConfigurator* pCfg = NULL;
	if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg)))
	{
	    pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_MEDIA);
	    pCfg->Release();
	    pCfg = NULL;
	}
	
	pOpt->Release();
	pOpt = NULL;
    }

    HX_RESULT hresult = Init(pContext);
    if(HXR_OK != hresult)
    {
	return hresult;
    }

#ifdef DEBUG
    if (debug_func_level() & DF_DROP_PACKETS)
    {
	m_drop_packets = TRUE;
    }
#endif /* DEBUG */

    RTPBaseTransport::init();

    return HXR_OK;
}

void
RTPUDPTransport::setForeignAddress(UINT32 foreignAddr, UINT16 foreignPort)
{
    m_foreignAddr = foreignAddr;
    m_foreignPort = foreignPort;

    UINT32 natTimeout = GetNATTimeout(m_pContext);

    if (!m_bIsSource && natTimeout)
    {
	// Initialize keepalive object
	m_keepAlive.Init(m_pScheduler, natTimeout, new KeepAliveCB(this));
    
	// Do initial "poke" through the NAT
	onNATKeepAlive();
    }
}

HX_RESULT RTPUDPTransport::handlePacket(IHXBuffer* pBuffer)
{
    m_keepAlive.OnActivity();
    return RTPBaseTransport::handlePacket(pBuffer);
}

void 
RTPUDPTransport::JoinMulticast(UINT32 ulAddress, UINT32 ulPort, IHXUDPSocket* pUDP)
{
    if (m_ulCurrentMulticastAddress)
    {
	m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
    }
    else
    {
	m_pMCastUDPSocket = pUDP;
        m_pMCastUDPSocket->AddRef();
    }

    m_pMCastUDPSocket->JoinMulticastGroup(ulAddress, HXR_INADDR_ANY);
    m_bMulticast = TRUE;
    m_ulCurrentMulticastAddress = ulAddress;
    m_ulCurrentMulticastPort = ulPort;

    if (m_pStreamHandler)
    {
	RTSPStreamData* pStreamData = m_pStreamHandler->firstStreamData();

	ASSERT(pStreamData);

	while(pStreamData)
	{
	    pStreamData->m_pTransportBuffer->SetMulticast();
	    pStreamData = m_pStreamHandler->nextStreamData();
	}
    }

    return;
}

HX_RESULT RTPUDPTransport::onNATKeepAlive()
{
    DPRINTF(D_INFO, ("RTP : onNATKeepAlive()\n"));

    // Send an RTP packet with PT=0 and no payload

    IHXBuffer* pPktBuf = NULL;
    if (m_pCommonClassFactory &&
	(HXR_OK == m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**)&pPktBuf)))
    {
	RTPPacket pkt;
	
	pkt.version_flag = 2;
	pkt.padding_flag = 0;
	pkt.csrc_len = 0;
	pkt.marker_flag = 0;
	pkt.extension_flag = 0;
	
	pkt.data.data = 0;
	pkt.data.len = 0;
	pkt.ssrc = m_pReportHandler->GetSSRC();
	
	pkt.payload = 0;
	pkt.seq_no = m_keepAliveSeq++;
	pkt.timestamp =  HX_GET_TICKCOUNT() * 8; // Timestamp in 1/8000 sec
	
	UINT32 packetLen = pkt.static_size() + pkt.data.len;
	
	if (HXR_OK == pPktBuf->SetSize(packetLen))
	{
	    // Pack the data into the buffer
	    pkt.pack(pPktBuf->GetBuffer(), packetLen);
	    
	    pPktBuf->SetSize(packetLen); // Update the packet size
	    
	    writePacket(pPktBuf);
	}
    }

    HX_RELEASE(pPktBuf);

    return HXR_OK;
}

HX_RESULT
RTPUDPTransport::writePacket(IHXBuffer* pSendBuffer)
{    
    if (!m_pUDPSocket)
        return HXR_FAIL;

    m_keepAlive.OnActivity();

    return m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuffer);
}

/*
 * XXXMC
 * Special-case handling for PV clients
 */
HX_RESULT
RTPUDPTransport::sendPVHandshakeResponse(UINT8* pPktPayload)
{
    IHXBuffer* pPktPayloadBuff = NULL;
    m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**) &pPktPayloadBuff);
    if (pPktPayloadBuff)
    {
        DPRINTF(D_INFO, ("RTP: Sending POKE PKT RESPONSE\n")); 
        pPktPayloadBuff->Set((UCHAR*)pPktPayload, 8);
        writePacket(pPktPayloadBuff);
        pPktPayloadBuff->Release();
    }
    return HXR_OK;
}

HX_RESULT
RTPUDPTransport::sendPacket(BasePacket* pPacket)
{
    HX_ASSERT(m_bActive);
    
    HX_RESULT theErr;
    if (m_ulPayloadWirePacket!=0)
    {
	IHXBuffer* pSendBuf = NULL;
	theErr = reflectPacket(pPacket, pSendBuf);
	
	if (HXR_OK == theErr)
	{
	    theErr = writePacket(pSendBuf);
	    pSendBuf->Release();
	}
	else if (HXR_IGNORE == theErr)
	{
	    return HXR_OK;
	}

	return theErr;
    }
    
    IHXBuffer* pPacketBuf = NULL;
    
    theErr = makePacket(pPacket, pPacketBuf);

    if (HXR_OK == theErr)
    {
	theErr = writePacket(pPacketBuf);

	/* send SR if necessary */
    	if (HXR_OK == theErr && m_pRTCPTran->m_bSendReport &&
	    m_pRTCPTran->m_bSendRTCP)
    	{
	    m_pRTCPTran->sendSenderReport();
	    m_pRTCPTran->m_bSendReport = FALSE;
	    m_pRTCPTran->scheduleNextReport();
    	}
    }

    HX_RELEASE(pPacketBuf);
    return theErr;
}

RTPUDPTransport::KeepAliveCB::KeepAliveCB(RTPUDPTransport* pTransport):
    m_pTransport(pTransport),
    m_lRefCount(0)
{
    if(m_pTransport)
    {
	m_pTransport->AddRef();
    }
}

RTPUDPTransport::KeepAliveCB::~KeepAliveCB()
{
    HX_RELEASE(m_pTransport);
}

STDMETHODIMP
RTPUDPTransport::KeepAliveCB::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -