📄 tsstreamer.cpp
字号:
/******************************************************************************** tsstreamer.cpp:*-------------------------------------------------------------------------------* (c)1999-2001 VideoLAN* $Id: tsstreamer.cpp,v 1.8 2002/08/09 13:42:32 tooney Exp $** Authors: Benoit Steiner <benny@via.ecp.fr>* Arnaud de Bossoreille de Ribou <bozo@via.ecp.fr>** This program is free software; you can redistribute it and/or* modify it under the terms of the GNU General Public License* as published by the Free Software Foundation; either version 2* of the License, or (at your option) any later version.** This program is distributed in the hope that it will be useful,* but WITHOUT ANY WARRANTY; without even the implied warranty of* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the* GNU General Public License for more details.** You should have received a copy of the GNU General Public License* along with this program; if not, write to the Free Software* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.**-------------------------------------------------------------------------------********************************************************************************///------------------------------------------------------------------------------// Preamble//------------------------------------------------------------------------------#include "../core/defs.h"#include "../core/core.h"#include "../mpeg/mpeg.h"#include "../mpeg/ts.h"#include "../mpeg/rtp.h"#include "program.h"#include "buffer.h"#include "output.h"#include "channel.h"#include "broadcast.h"#include "request.h"#include "input.h"#include "tsstreamer.h"#if defined(HAVE_NANOSLEEP) && !defined(HAVE_DECL_NANOSLEEP)extern "C" {int nanosleep(struct timespec *, struct timespec *);}#endif /************************************************************************************************************************************************************************************************************************************************///------------------------------------------------------------------------------////------------------------------------------------------------------------------// The thread is created in detached state to allow itself to stop its execution// before the end of the stream if an error occurs//------------------------------------------------------------------------------C_TsStreamer::C_TsStreamer(handle hLog, C_Broadcast* pBroadcast, C_NetList* pTsProvider, C_SyncFifo* pBuffer, C_EventHandler* pEventHandler, bool bOwnProvider, bool bUsePcr){ ASSERT(pBroadcast); ASSERT(pTsProvider); ASSERT(pBuffer); ASSERT(pEventHandler); ASSERT(pTsProvider->Capacity() > pBuffer->Size()); m_hLog = hLog; m_pBroadcast = pBroadcast; m_pTsProvider = pTsProvider; m_pBuffer = pBuffer; m_pEventHandler = pEventHandler; C_Channel* pChannel = m_pBroadcast->GetChannel(); ASSERT(pChannel); m_pOutput = pChannel->GetOutput(); m_bStop = 0; m_uiByteRead = 0; m_bFirstPCR = 1; // Set those values to 0 in order to send the packets that come before // the first one with a PCR as fast as possible (best behaviour) m_iLastTime = 0; m_iDeltaClock = 0; m_dSlope = 0; m_iHowMany = 0; m_bOwnProvider = bOwnProvider; m_bUsePcr = bUsePcr;}//------------------------------------------------------------------------------////------------------------------------------------------------------------------C_TsStreamer::~C_TsStreamer(){ // The output is no more needed C_Channel* pChannel = m_pBroadcast->GetChannel(); ASSERT(pChannel); pChannel->ReleaseOutput(); // The TsProvider is shared by the Reader and the Streamer's output: it will // therefore be destroyed by the Streamer since this is the last stopped if(m_bOwnProvider) delete m_pTsProvider; // The Buffer is shared by the Reader and the Streamer: it will be // destroyed by the Streamer since this is the last stopped delete m_pBuffer;}//------------------------------------------------------------------------------////------------------------------------------------------------------------------void C_TsStreamer::InitWork(){ try { m_pOutput->Init(m_pTsProvider); } catch(E_Exception e) { throw E_Exception(GEN_ERR, "Unable to init streamer", e); }}//------------------------------------------------------------------------------////------------------------------------------------------------------------------void C_TsStreamer::DoWork(){ int iRc = NO_ERR; // Get the packet from the buffer and send them to the output while(!m_bStop && iRc >= 0) { C_TsPacket* pPacket = m_pBuffer->Pop(); ASSERT(pPacket); if(m_bUsePcr) { // Increases the counters m_uiByteRead += TS_PACKET_LEN; m_iHowMany ++; // Read the PCR info if any if(pPacket->HasPCR()) { if(m_bFirstPCR) { InitClock(pPacket); m_bFirstPCR = 0; } else AdjustClock(pPacket); } // Wait until the date at which the packet must be sent is reached // For better perf, doesn't sleep for each TS but only when the output // buffer is ready to be send if(m_iHowMany == m_pOutput->GetBuffCapacity()) { WaitSendDate(); m_iHowMany = 0; } else m_iHowMany++; } // TS packet is ok, send it try { m_pOutput->Send(pPacket, (m_pBroadcast->GetOption("rtp") == "1") ); } catch(E_Exception e) { iRc = GEN_ERR; C_String strPgrmName = m_pBroadcast->GetProgram()->GetName(); Log(m_hLog, LOG_NOTE, C_String("Unable to send stream for pgrm ")+ strPgrmName + ": "/* + e.Dump()*/); } } // If a problem occurred in streaming and no stop signal is currently // pending, ask for program termination if(iRc <= 0 && !m_bStop) { C_String strPgrmName = m_pBroadcast->GetProgram()->GetName(); Log(m_hLog, LOG_NOTE, C_String("Error when streaming program ")+ strPgrmName+": aborting"); m_bStop = true; C_Event cEvent(m_pBroadcast->GetInput()->GetName()); cEvent.SetCode(CONNEXION_LOST_EVENT); cEvent.SetBroadcast(m_pBroadcast); m_pEventHandler->HandleEvent(cEvent); }}//------------------------------------------------------------------------------////------------------------------------------------------------------------------void C_TsStreamer::StopWork(){ // The thread can be waiting to pop a packet: we must cancel it to be sure // to avoid a dead thread // Pb: the input buffer is not flushed, so some video frames may be never // sent. Workaround: A little sleep // Another pb: the input may be unable to send the requested frames, so we must // check that the size of the buffer decreases at each step if(!m_bStop) { unsigned int uiBuffSize = m_pBuffer->Size(); while(uiBuffSize > 0) { C_String strPgrmName = m_pBroadcast->GetProgram()->GetName(); LogDbg(m_hLog, "Sleeping "+ (uiBuffSize/800+1) + " seconds before destroying TsStreamer for pgrm "+strPgrmName); Pause(uiBuffSize / 800 + 1); unsigned int uiNewBuffSize = m_pBuffer->Size(); if(uiNewBuffSize < uiBuffSize) uiBuffSize = uiNewBuffSize; else break; } m_bStop = true; // The streamer should be blocked at this time. That's why we force its // interruption instead of using the DelayedInterruption() method. Interrupt(); }}//------------------------------------------------------------------------------////------------------------------------------------------------------------------void C_TsStreamer::CleanWork(){ m_pOutput->Close();}//------------------------------------------------------------------------------////------------------------------------------------------------------------------// Init the clock: compute the DeltaClock and init LastTime variable// Must therefore be called instead of AdjustClock when the first PCR is// received//------------------------------------------------------------------------------inline void C_TsStreamer::InitClock(C_TsPacket* pPacket){ ASSERT(pPacket); ASSERT(pPacket->HasPCR()); s64 iPCRTime = pPacket->GetPCRTime(); // Compute the delta between the clock of the PC and the one of the encoder m_iDeltaClock = GetDate() - iPCRTime; // Update the data for the next PCR m_uiByteRead = 0; m_iLastTime = iPCRTime;}//------------------------------------------------------------------------------////------------------------------------------------------------------------------// The clock is adjusted for the time to go with the help of the current PCR// and of the previous one, not of the current one and the next one: this can// lead to some little pbs if the throughtput is not constant and if the buffer// of the client are to small //------------------------------------------------------------------------------inline void C_TsStreamer::AdjustClock(C_TsPacket* pPacket){ ASSERT(pPacket); ASSERT(pPacket->HasPCR()); s64 iPCRTime = pPacket->GetPCRTime(); if(pPacket->IsDiscontinuity()) { // Reajust the delta between the clock of the PC and the one of the encoder C_String strPgrmName = m_pBroadcast->GetProgram()->GetName(); LogDbg(m_hLog, "Adjusting timer discontinuity for pgrm "+strPgrmName); m_iDeltaClock = GetDate() - iPCRTime; } else { // (Re)evaluate the slope ASSERT(m_uiByteRead > 0);#ifdef _WIN32 m_dSlope = ((double)iPCRTime - m_iLastTime) / (s64)m_uiByteRead;#else m_dSlope = ((double)iPCRTime - m_iLastTime) / m_uiByteRead;#endif } // Update the data for the next PCR m_uiByteRead = 0; m_iLastTime = iPCRTime;}//------------------------------------------------------------------------------////------------------------------------------------------------------------------inline void C_TsStreamer::WaitSendDate(){ s64 iSendDate = m_iLastTime + m_iDeltaClock + (s64)m_dSlope*m_uiByteRead; s64 iWait = iSendDate - GetDate(); // Wait only if the delay becomes > 0.5 ms, for the select will make we late // (it usually takes more than 1 ms) if(iWait > 500) { // Sleep during the given time#ifdef WIN32 Sleep((int)(iWait / 1000));#elif defined(HAVE_NANOSLEEP) struct timespec tsDelay; tsDelay.tv_sec = iWait / 1000000; tsDelay.tv_nsec = 1000 * (iWait % 1000000); // Wait for the given delay nanosleep(&tsDelay, NULL);#else# error nanosleep not present !#endif }}//------------------------------------------------------------------------------// Returns the current date in microseconds//------------------------------------------------------------------------------inline s64 C_TsStreamer::GetDate(){#ifdef _WIN32 s64 freq, usec_time; if( QueryPerformanceFrequency( (LARGE_INTEGER *)&freq ) ) { // Microsecond resolution QueryPerformanceCounter( (LARGE_INTEGER *)&usec_time ); return ( usec_time * 1000000 ) / freq; } // Milisecond resolution return 1000 * GetTickCount();#else struct timeval tv; gettimeofday(&tv, NULL); return( (s64)tv.tv_sec * 1000000 + (s64)tv.tv_usec );#endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -