📄 ppffsrc.cpp
字号:
pValues->AddRef(); UINT8 pBW[128]; sprintf((char*)pBW, "%ld", m_ulBandwidth); /* Flawfinder: ignore */ IHXBuffer* pBandwidth = new CHXBuffer(); pBandwidth->AddRef(); pBandwidth->Set(pBW, strlen((char*)pBW) + 1); pValues->SetPropertyCString("Bandwidth", pBandwidth); rules.GetSubscription(bSubInfo, pValues); HX_RELEASE(pBandwidth); HX_RELEASE(pValues); /* * now get Dependencies and figure out TimeStampDelivery stream */ IHXValues* pRuleProps = NULL; IHXBuffer* pBuffer = NULL; UINT16 iRule; m_pRules = new RuleInfo[m_cRules]; for (iRule = 0; iRule < m_cRules; iRule++) { rules.GetProperties(iRule, pRuleProps); pBuffer = 0; if (SUCCEEDED(pRuleProps->GetPropertyCString("OnDepend", pBuffer))) { UINT16 *pOnDeps = new UINT16[pBuffer->GetSize()]; memset(pOnDeps, 0, sizeof(UINT16)*pBuffer->GetSize()); UINT16 ulNum = 0; const UINT8* pTemp = pBuffer->GetBuffer(); while (*pTemp) { if (*pTemp == ',') { ulNum++; pOnDeps[ulNum] = 0; } else { if ((*pTemp >= '0') && (*pTemp <= '9')) { pOnDeps[ulNum] *= 10; pOnDeps[ulNum] += *pTemp - '0'; } } pTemp++; } ulNum++; m_pRules[iRule].m_pOnDepends = new UINT16[ulNum + 1]; memcpy(m_pRules[iRule].m_pOnDepends, pOnDeps, /* Flawfinder: ignore */ (ulNum + 1) * sizeof(UINT16)); m_pRules[iRule].m_pOnDepends[ulNum] = PP_DEPEND_END; HX_RELEASE(pBuffer); HX_VECTOR_DELETE(pOnDeps); } if (TRUE == bSubInfo[iRule] && !m_bTimeStampDelivery) { // don't have to do this twice! HX_ASSERT(pRuleProps); if (SUCCEEDED(pRuleProps->GetPropertyCString("TimeStampDelivery", pBuffer))) { if ((pBuffer->GetBuffer()[0] == 'T') || (pBuffer->GetBuffer()[0] == 't')) { m_bTimeStampDelivery = TRUE; } HX_RELEASE(pBuffer); } } HX_RELEASE(pRuleProps); } pRuleBuf->Release(); } if (!m_bIsRMSource) { // Treat it as time stamp delivered m_bTimeStampDelivery = TRUE; // add 6 seconds preroll if it is not in a header already because // SDPs from non-rmserver will not have this. UINT32 ulVal = 0; if (HXR_OK != m_pHeader->GetPropertyULONG32("Preroll", ulVal)) { m_pHeader->SetPropertyULONG32("Preroll", PP_SYNC_TIMEOUT_MS); } if (HXR_OK != m_pHeader->GetPropertyULONG32("HasOutOfOrderTS", ulVal)) { m_pHeader->SetPropertyULONG32("HasOutOfOrderTS", 1); } } /* * The core must be forced to buffer until at least one * packet from each stream is received since the core * does not generate properly offset OnTimeSync() calls * to the renderers until such time. */ ULONG32 ulVal = 0; // We want to use predata to force buffering - see if it is already set m_pHeader->GetPropertyULONG32("PreDataAtStart", ulVal); if (ulVal) { // If PreDataAtStart is already used, make sure Predata is not 0 m_pHeader->GetPropertyULONG32("Predata", ulVal); } if (!ulVal) { // Predata is not used for buffering, use it as long as any preroll // is wanted on this stream m_pHeader->GetPropertyULONG32("Preroll", ulVal); if (ulVal) { m_pHeader->SetPropertyULONG32("PreDataAtStart", 1); m_pHeader->SetPropertyULONG32("PreDataAfterSeek", 1); m_pHeader->SetPropertyULONG32("PrerollAtStart", 1); m_pHeader->SetPropertyULONG32("PrerollAfterSeek", 1); ulVal = 0; m_pHeader->GetPropertyULONG32("Predata", ulVal); if (ulVal == 0) { ulVal = 1; m_pHeader->SetPropertyULONG32("Predata", ulVal); } } } /* * Get the strings to send in RTCP receiver reports from the plugin */ m_pPlugin->GetCName(m_pCName); m_pPlugin->GetUserName(m_pName); m_pPlugin->GetTool(m_pTool); m_pPlugin->GetEmailName(m_pEmail); m_pNet->CreateResolver(&m_pResolver); HX_ASSERT(m_pResolver != NULL); m_uMultiPort = uMultiPort; m_ulInterfaceAddr = ulInterfaceAddr; // initialize the bin // 5 RTCP interval + 1 for the ones to be removed m_MemberTimeoutBins.Init(6); m_MemberTimeoutBins.UpdateBins(); /* * Finish the Initialization in GetHostByNameDone */ m_pResolver->Init(this); m_pResolver->GetHostByName(pMultiAddr); return HXR_OK;} STDMETHODIMP CPurePlaySource::GetHostByNameDone ( HX_RESULT status, UINT32 ulAddr ) { HX_RESULT theErr = HXR_OK; BOOL bRTPSockOK = FALSE; IHXSetSocketOption* pSockOpt = NULL; /* * Continue where we left off with the initialization * see CPurePlaySource::Init */ if (status != HXR_OK) { m_pPlugin->StreamDone(status, m_iStream); theErr = status; goto bail; } else { /* * Create RTP socket for RTP messages on the port passed */ m_ulMultiAddr = ulAddr; m_pNet->CreateUDPSocket(&m_pRTPSock); HX_ASSERT(m_pRTPSock != NULL); m_pRTPResp = new CRTPResponseHandler(this, RTP_PORT); m_pRTPResp->AddRef(); theErr = m_pRTPSock->Init(m_ulInterfaceAddr, m_uMultiPort, m_pRTPResp); if (theErr != HXR_OK) goto bail; // set option before it binds theErr = m_pRTPSock->QueryInterface(IID_IHXSetSocketOption, (void**)&pSockOpt); if (pSockOpt) { pSockOpt->SetOption(HX_SOCKOPT_REUSE_ADDR, TRUE); pSockOpt->SetOption(HX_SOCKOPT_REUSE_PORT, TRUE); }#ifdef _UNIX theErr = m_pRTPSock->Bind(HX_INADDR_ANY, m_uMultiPort);#else theErr = m_pRTPSock->Bind(m_ulInterfaceAddr, m_uMultiPort);#endif if (theErr != HXR_OK) goto bail; theErr = m_pRTPSock->JoinMulticastGroup(m_ulMultiAddr, m_ulInterfaceAddr); if (theErr != HXR_OK) goto bail; if (pSockOpt) { pSockOpt->SetOption(HX_SOCKOPT_MULTICAST_IF, m_ulInterfaceAddr); HX_RELEASE(pSockOpt); } bRTPSockOK = TRUE; #ifdef XXXGo_DEBUG if (m_pFFLog) { fprintf(m_pFFLog, "RTP IGMP join %u on %u\n", m_ulMultiAddr, m_ulInterfaceAddr); }#endif /* * RTCP messages come in on the next port higher than the RTP * port. We need to send messages on this so get the TTL and * set it on this socket only. */ m_pNet->CreateUDPSocket(&m_pRTCPSock); HX_ASSERT(m_pRTCPSock != NULL); m_pRTCPResp = new CRTPResponseHandler(this, RTCP_PORT); m_pRTCPResp->AddRef(); theErr = m_pRTCPSock->Init(m_ulInterfaceAddr, m_uMultiPort + 1, m_pRTCPResp); if (theErr != HXR_OK) goto bail; // set option before it binds theErr = m_pRTCPSock->QueryInterface(IID_IHXSetSocketOption, (void**)&pSockOpt); if (pSockOpt) { pSockOpt->SetOption(HX_SOCKOPT_REUSE_ADDR, TRUE); pSockOpt->SetOption(HX_SOCKOPT_REUSE_PORT, TRUE); }#ifdef _UNIX theErr = m_pRTCPSock->Bind(HX_INADDR_ANY, m_uMultiPort + 1);#else theErr = m_pRTCPSock->Bind(m_ulInterfaceAddr, m_uMultiPort + 1);#endif if (theErr != HXR_OK) goto bail; theErr = m_pRTCPSock->JoinMulticastGroup(m_ulMultiAddr, m_ulInterfaceAddr); if (theErr != HXR_OK) goto bail; if (pSockOpt) { // set multicast interface so we will send RTCP pkts on the right // interface. pSockOpt->SetOption(HX_SOCKOPT_MULTICAST_IF, m_ulInterfaceAddr); HX_RELEASE(pSockOpt); } #ifdef XXXGo_DEBUG if (m_pFFLog) { fprintf(m_pFFLog, "RTCP IGMP join %u on %u\n", m_ulMultiAddr, m_ulInterfaceAddr); }#endif } Begin(); return HXR_OK;bail: /* * XXXGo - If one of interfaces succeed, we don't want to end this whole * presentation....Need to have a special clean up sequence... */ if (!bRTPSockOK) { m_pPlugin->m_strLastErr.Format("%d.%d.%d.%d/%d on %d.%d.%d.%d", (ulAddr >> 24) & 0xFF, (ulAddr >> 16) & 0xFF, (ulAddr >> 8) & 0xFF, (ulAddr ) & 0xFF, m_uMultiPort, (m_ulInterfaceAddr >> 24) & 0xFF, (m_ulInterfaceAddr >> 16) & 0xFF, (m_ulInterfaceAddr >> 8) & 0xFF, (m_ulInterfaceAddr ) & 0xFF); } else { m_pPlugin->m_strLastErr.Format("%d.%d.%d.%d/%d on %d.%d.%d.%d", (ulAddr >> 24) & 0xFF, (ulAddr >> 16) & 0xFF, (ulAddr >> 8) & 0xFF, (ulAddr ) & 0xFF, m_uMultiPort + 1, (m_ulInterfaceAddr >> 24) & 0xFF, (m_ulInterfaceAddr >> 16) & 0xFF, (m_ulInterfaceAddr >> 8) & 0xFF, (m_ulInterfaceAddr ) & 0xFF); } HX_RELEASE(pSockOpt); HX_RELEASE(m_pRTPSock); HX_RELEASE(m_pRTCPSock); HX_RELEASE(m_pRTPResp); HX_RELEASE(m_pRTCPResp); // we want to switch to unicast if possible. m_pPlugin->m_tryUnicast = SOCKET_FAIL; m_pPlugin->Close(); return theErr; } void CPurePlaySource::Begin() { /* * We may be called during the resolution of the multicast address * in Init, and are not ready to start reading. This is called * by GetHostByName when it completes */ if (m_bBeginPending) { m_ulBeginClockTick = HX_GET_TICKCOUNT(); m_bBeginPending = FALSE; m_pRTPSock->Read(1024); m_pRTCPSock->Read(1024); } else { m_bBeginPending = TRUE; } }// most cleaning up is done herevoidCPurePlaySource::Cleanup(void){ /* * Release the callback if it exists */ if (m_hStatusCallbackID != 0) { if (m_pScheduler) { m_pScheduler->Remove(m_hStatusCallbackID); m_hStatusCallbackID = 0; } } HX_RELEASE(m_pHeader); HX_RELEASE(m_pNet); HX_RELEASE(m_pResolver); /* * Flush out any pending reads */ if (m_pRTPSock) { m_pRTPSock->LeaveMulticastGroup(m_ulMultiAddr, m_ulInterfaceAddr);#ifdef XXXGo_DEBUG if (m_pFFLog) { fprintf(m_pLogFile, "RTP IGMP leave %u on %u\n", m_ulMultiAddr, m_ulInterfaceAddr); fflush(m_pLogFile); }#endif } if (m_pRTCPSock) { m_pRTCPSock->LeaveMulticastGroup(m_ulMultiAddr, m_ulInterfaceAddr);#ifdef XXXGo_DEBUG if (m_pFFLog) { fprintf(m_pLogFile, "RTCP IGMP leave %u on %u\n", m_ulMultiAddr, m_ulInterfaceAddr); fflush(m_pLogFile); }#endif } HX_RELEASE(m_pRTPSock); HX_RELEASE(m_pRTPResp); HX_RELEASE(m_pRTCPSock); HX_RELEASE(m_pRTCPResp); HX_RELEASE(m_pCName); HX_RELEASE(m_pName); HX_RELEASE(m_pTool); HX_RELEASE(m_pEmail); HX_RELEASE(m_pScheduler); HX_RELEASE(m_pPlugin); // need to clean up Sources // First clean up ID map. m_SsrcToMember.RemoveKey() may return FALSE if // this strm obj is RA and it has been removed from SSRC map in // RemoveSender(). if (!m_StrmIdToStrm.IsEmpty()) { CHXMapLongToObj::Iterator i; for (i = m_StrmIdToStrm.Begin(); i != m_StrmIdToStrm.End(); ++i) { HX_ASSERT(((CStream*)(*i))->m_bMadeCut); m_SsrcToStream.RemoveKey(((CStream*)(*i))->m_ulSSRC); delete (CStream*)(*i); } m_StrmIdToStrm.RemoveAll(); } if (!m_SsrcToStream.IsEmpty()) { // we have more source obj's in SSRC map. They are the ones which // didn't make the source admission timeout cut! CHXMapLongToObj::Iterator i; for (i = m_SsrcToStream.Begin(); i != m_SsrcToStream.End(); ++i) { delete (CStream*)(*i); } m_SsrcToStream.RemoveAll(); } if (!m_SsrcToMember.IsEmpty()) { CHXMapLongToObj::Iterator i; for (i = m_SsrcToMember.Begin(); i != m_SsrcToMember.End(); ++i) { delete (CMember*)(*i); } m_SsrcToMember.RemoveAll(); } // all the obj have been deleted above... m_MemberTimeoutBins.DeleteAllBins(); HX_RELEASE(m_pByePkt); // this Release will cause it to get deleted Release();} HX_RESULT CPurePlaySource::Close(){#ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "Source::Close()\n"); fflush(m_pLogFile); }#endif if (m_bClosed) { // don't do this twice!!! return HXR_OK; } m_bClosed = TRUE; AddRef(); // if addmission had not been closed, we have never sent any // RTCP yet, so there is no reason to send BYE msg if (m_pPlugin->IsSourceAdmissionClosed()) { if (!ScheduleBye()) { // ByePkt will not be sent, so need to clean up here Cleanup(); } } else { Cleanup(); } return HXR_OK;} void CPurePlaySource::GetHeader(REF(IHXValues*) pHeader, UINT16 unStreamNumber) { // it's all the same, except stream number HX_ASSERT(m_pHeader != NULL); pHeader = new CHXHeader; pHeader->AddRef(); const char* pCh; ULONG32 ul; // first, ULONG32 if (m_pHeader->GetFirstPropertyULONG32(pCh, ul) == HXR_OK) { pHeader->SetPropertyULONG32(pCh, ul); while (HXR_OK == m_pHeader->GetNextPropertyULONG32(pCh, ul)) { pHeader->SetPropertyULONG32(pCh, ul); } } // then, CString IHXBuffer* pCString; if (m_pHeader->GetFirstPropertyCString(pCh, pCString) == HXR_OK) { pHeader->SetPropertyCString(pCh, pCString); HX_RELEASE(pCString); while (HXR_OK == m_pHeader->GetNextPropertyCString(pCh, pCString)) { pHeader->SetPropertyCString(pCh, pCString); HX_RELEASE(pCString); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -