⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rtp.cxx

📁 sloedgy open sip stack source code
💻 CXX
📖 第 1 页 / 共 5 页
字号:
      AddReceiverReport(*(RTP_ControlFrame::ReceiverReport *)&sender[1]);
    }
  }

  // Add the SDES part to compound RTCP packet
  PTRACE(2, "RTP\tSending SDES: " << canonicalName);
  report.WriteNextCompound();

  RTP_ControlFrame::SourceDescription & sdes = report.AddSourceDescription(syncSourceOut);
  report.AddSourceDescriptionItem(sdes, RTP_ControlFrame::e_CNAME, canonicalName);
  report.AddSourceDescriptionItem(sdes, RTP_ControlFrame::e_TOOL, toolName);

  // Wait a fuzzy amount of time so things don't get into lock step
  int interval = (int)reportTimeInterval.GetMilliSeconds();
  int third = interval/3;
  interval += PRandom::Number()%(2*third);
  interval -= third;
  reportTimer = interval;

  return WriteControl(report);
}


static RTP_Session::ReceiverReportArray
BuildReceiverReportArray(const RTP_ControlFrame & frame, PINDEX offset)
{
  RTP_Session::ReceiverReportArray reports;

  const RTP_ControlFrame::ReceiverReport * rr = (const RTP_ControlFrame::ReceiverReport *)(frame.GetPayloadPtr()+offset);
  for (PINDEX repIdx = 0; repIdx < (PINDEX)frame.GetCount(); repIdx++) {
    RTP_Session::ReceiverReport * report = new RTP_Session::ReceiverReport;
    report->sourceIdentifier = rr->ssrc;
    report->fractionLost = rr->fraction;
    report->totalLost = rr->GetLostPackets();
    report->lastSequenceNumber = rr->last_seq;
    report->jitter = rr->jitter;
    report->lastTimestamp = (PInt64)(DWORD)rr->lsr;
    report->delay = ((PInt64)rr->dlsr << 16)/1000;
    reports.SetAt(repIdx, report);
    rr++;
  }

  return reports;
}


RTP_Session::SendReceiveStatus RTP_Session::OnReceiveControl(RTP_ControlFrame & frame)
{
  do {
    BYTE * payload = frame.GetPayloadPtr();
    unsigned size = frame.GetPayloadSize();

    switch (frame.GetPayloadType()) {
    case RTP_ControlFrame::e_SenderReport :
      if (size >= sizeof(RTP_ControlFrame::SenderReport)) {
	      SenderReport sender;
	      const RTP_ControlFrame::SenderReport & sr = *(const RTP_ControlFrame::SenderReport *)payload;
	      sender.sourceIdentifier = sr.ssrc;
	      sender.realTimestamp = PTime(sr.ntp_sec-SecondsFrom1900to1970, sr.ntp_frac/4294);
	      sender.rtpTimestamp = sr.rtp_ts;
	      sender.packetsSent = sr.psent;
	      sender.octetsSent = sr.osent;
	      OnRxSenderReport(sender,
			  BuildReceiverReportArray(frame, sizeof(RTP_ControlFrame::SenderReport)));
      }
      else {
	      PTRACE(2, "RTP\tSenderReport packet truncated");
      }
      break;

    case RTP_ControlFrame::e_ReceiverReport :
      if (size >= 4)
	      OnRxReceiverReport(*(const PUInt32b *)payload,
			  BuildReceiverReportArray(frame, sizeof(PUInt32b)));
      else {
	      PTRACE(2, "RTP\tReceiverReport packet truncated");
      }
      break;

    case RTP_ControlFrame::e_SourceDescription :
      if (size >= frame.GetCount()*sizeof(RTP_ControlFrame::SourceDescription)) {
      	SourceDescriptionArray descriptions;
	      const RTP_ControlFrame::SourceDescription * sdes = (const RTP_ControlFrame::SourceDescription *)payload;
	      for (PINDEX srcIdx = 0; srcIdx < (PINDEX)frame.GetCount(); srcIdx++) {
	        descriptions.SetAt(srcIdx, new SourceDescription(sdes->src));
	        const RTP_ControlFrame::SourceDescription::Item * item = sdes->item;
	        while (item->type != RTP_ControlFrame::e_END) {
	          descriptions[srcIdx].items.SetAt(item->type, PString(item->data, item->length));
	          item = item->GetNextItem();
	        }
	        sdes = (const RTP_ControlFrame::SourceDescription *)item->GetNextItem();
	      }
	      OnRxSourceDescription(descriptions);
      }
      else {
	      PTRACE(2, "RTP\tSourceDescription packet truncated");
      }
      break;

    case RTP_ControlFrame::e_Goodbye :
      if (size >= 4) {
	      PString str;
	      unsigned count = frame.GetCount()*4;
	      if (size > count)
	        str = PString((const char *)(payload+count+1), payload[count]);
	      PDWORDArray sources(count);
	      for (PINDEX i = 0; i < (PINDEX)count; i++)
	        sources[i] = ((const PUInt32b *)payload)[i];
	      OnRxGoodbye(sources, str);
      }
      else {
	      PTRACE(2, "RTP\tGoodbye packet truncated");
      }
      break;

    case RTP_ControlFrame::e_ApplDefined :
      if (size >= 4) {
	      PString str((const char *)(payload+4), 4);
	      OnRxApplDefined(str, frame.GetCount(), *(const PUInt32b *)payload,
			  payload+8, frame.GetPayloadSize()-8);
      }
      else {
	      PTRACE(2, "RTP\tApplDefined packet truncated");
      }
      break;

    default :
      PTRACE(2, "RTP\tUnknown control payload type: " << frame.GetPayloadType());
    }
  } while (frame.ReadNextCompound());

  return e_ProcessPacket;
}


void RTP_Session::OnRxSenderReport(const SenderReport & PTRACE_sender,
				   const ReceiverReportArray & PTRACE_reports)
{
#if PTRACING
  PTRACE(3, "RTP\tOnRxSenderReport: " << PTRACE_sender);
  for (PINDEX i = 0; i < PTRACE_reports.GetSize(); i++)
    PTRACE(3, "RTP\tOnRxSenderReport RR: " << PTRACE_reports[i]);
#endif
}


void RTP_Session::OnRxReceiverReport(DWORD PTRACE_src,
				     const ReceiverReportArray & PTRACE_reports)
{
#if PTRACING
  PTRACE(3, "RTP\tOnReceiverReport: ssrc=" << PTRACE_src);
  for (PINDEX i = 0; i < PTRACE_reports.GetSize(); i++)
    PTRACE(3, "RTP\tOnReceiverReport RR: " << PTRACE_reports[i]);
#endif
}


void RTP_Session::OnRxSourceDescription(const SourceDescriptionArray & PTRACE_description)
{
#if PTRACING
  for (PINDEX i = 0; i < PTRACE_description.GetSize(); i++)
    PTRACE(3, "RTP\tOnSourceDescription: " << PTRACE_description[i]);
#endif
}


void RTP_Session::OnRxGoodbye(const PDWORDArray & PTRACE_src,
			      const PString & PTRACE_reason)
{
  PTRACE(3, "RTP\tOnGoodbye: \"" << PTRACE_reason << "\" srcs=" << PTRACE_src);
}


void RTP_Session::OnRxApplDefined(const PString & PTRACE_type,
				  unsigned PTRACE_subtype, DWORD PTRACE_src,
				  const BYTE * /*data*/, PINDEX PTRACE_size)
{
  PTRACE(3, "RTP\tOnApplDefined: \"" << PTRACE_type << "\"-" << PTRACE_subtype
	 << " " << PTRACE_src << " [" << PTRACE_size << ']');
}


void RTP_Session::ReceiverReport::PrintOn(ostream & strm) const
{
  strm << "ssrc=" << sourceIdentifier
    << " fraction=" << fractionLost
    << " lost=" << totalLost
    << " last_seq=" << lastSequenceNumber
    << " jitter=" << jitter
    << " lsr=" << lastTimestamp
    << " dlsr=" << delay;
}


void RTP_Session::SenderReport::PrintOn(ostream & strm) const
{
  strm << "ssrc=" << sourceIdentifier
    << " ntp=" << realTimestamp.AsString("yyyy/M/d-h:m:s.uuuu")
    << " rtp=" << rtpTimestamp
    << " psent=" << packetsSent
    << " osent=" << octetsSent;
}


void RTP_Session::SourceDescription::PrintOn(ostream & strm) const
{
  static const char * const DescriptionNames[RTP_ControlFrame::NumDescriptionTypes] = {
    "END", "CNAME", "NAME", "EMAIL", "PHONE", "LOC", "TOOL", "NOTE", "PRIV"
  };

  strm << "ssrc=" << sourceIdentifier;
  for (PINDEX i = 0; i < items.GetSize(); i++) {
    strm << "\n  item[" << i << "]: type=";
    unsigned typeNum = items.GetKeyAt(i);
    if (typeNum < PARRAYSIZE(DescriptionNames))
      strm << DescriptionNames[typeNum];
    else
      strm << typeNum;
    strm << " data=\""
      << items.GetDataAt(i)
      << '"';
  }
}


DWORD RTP_Session::GetPacketsTooLate() const
{
  return jitter != NULL ? jitter->GetPacketsTooLate() : 0;
}


/////////////////////////////////////////////////////////////////////////////

RTP_SessionManager::RTP_SessionManager()
{
  enumerationIndex = P_MAX_INDEX;
}


RTP_SessionManager::RTP_SessionManager(const RTP_SessionManager & sm)
: sessions(sm.sessions)
{
  enumerationIndex = P_MAX_INDEX;
}


RTP_SessionManager & RTP_SessionManager::operator=(const RTP_SessionManager & sm)
{
  PWaitAndSignal m1(mutex);
  PWaitAndSignal m2(sm.mutex);
  sessions = sm.sessions;
  return *this;
}


RTP_Session * RTP_SessionManager::UseSession(unsigned sessionID)
{
  PWaitAndSignal m(mutex);

  RTP_Session * session = sessions.GetAt(sessionID);
  if (session == NULL) 
    return NULL; 
  
  PTRACE(3, "RTP\tFound existing session " << sessionID);
  session->IncrementReference();

  return session;
}


void RTP_SessionManager::AddSession(RTP_Session * session)
{
  PWaitAndSignal m(mutex);
  
  if (session != NULL) {
    PTRACE(2, "RTP\tAdding session " << *session);
    sessions.SetAt(session->GetSessionID(), session);
  }
}


void RTP_SessionManager::ReleaseSession(unsigned sessionID)
{
  PTRACE(2, "RTP\tReleasing session " << sessionID);

  mutex.Wait();

  if (sessions.Contains(sessionID)) {
    if (sessions[sessionID].DecrementReference()) {
      PTRACE(3, "RTP\tDeleting session " << sessionID);
      sessions[sessionID].SetJitterBufferSize(0, 0);
      sessions.SetAt(sessionID, NULL);
    }
  }

  mutex.Signal();
}


RTP_Session * RTP_SessionManager::GetSession(unsigned sessionID) const
{
  PWaitAndSignal wait(mutex);
  if (!sessions.Contains(sessionID))
    return NULL;

  PTRACE(3, "RTP\tFound existing session " << sessionID);
  return &sessions[sessionID];
}


RTP_Session * RTP_SessionManager::First()
{
  mutex.Wait();

  enumerationIndex = 0;
  return Next();
}


RTP_Session * RTP_SessionManager::Next()
{
  if (enumerationIndex < sessions.GetSize())
    return &sessions.GetDataAt(enumerationIndex++);

  Exit();
  return NULL;
}


void RTP_SessionManager::Exit()
{
  enumerationIndex = P_MAX_INDEX;
  mutex.Signal();
}


/////////////////////////////////////////////////////////////////////////////

static void SetMinBufferSize(PUDPSocket & sock, int buftype)
{
  int sz = 0;
  if (sock.GetOption(buftype, sz)) {
    if (sz >= UDP_BUFFER_SIZE)
      return;
  }

  if (!sock.SetOption(buftype, UDP_BUFFER_SIZE)) {
    PTRACE(1, "RTP_UDP\tSetOption(" << buftype << ") failed: " << sock.GetErrorText());
  }
}


RTP_UDP::RTP_UDP(unsigned id)
  : RTP_Session(id),
    remoteAddress(0),
    remoteTransmitAddress(0)
{
  remoteDataPort = 0;
  remoteControlPort = 0;
  tempRemoteDataPort = 0;
  tempRemoteControlPort = 0;
  tempRemoteAddress = 0;
  shutdownRead = FALSE;
  shutdownWrite = FALSE;
  dataSocket = NULL;
  controlSocket = NULL;
  appliedQOS = FALSE;
  encrypted = FALSE;
}


RTP_UDP::~RTP_UDP()
{
  Close(TRUE);
  Close(FALSE);

  delete dataSocket;
  delete controlSocket;
}


void RTP_UDP::ApplyQOS(const PIPSocket::Address & addr)
{
  if (controlSocket != NULL)
    controlSocket->SetSendAddress(addr,GetRemoteControlPort());
  if (dataSocket != NULL)
    dataSocket->SetSendAddress(addr,GetRemoteDataPort());
  appliedQOS = TRUE;
}


BOOL RTP_UDP::ModifyQOS(RTP_QOS * rtpqos)
{
  BOOL retval = FALSE;

  if (rtpqos == NULL)
    return retval;

  if (controlSocket != NULL)
    retval = controlSocket->ModifyQoSSpec(&(rtpqos->ctrlQoS));
    
  if (dataSocket != NULL)
    retval &= dataSocket->ModifyQoSSpec(&(rtpqos->dataQoS));

  appliedQOS = FALSE;
  return retval;
}

BOOL RTP_UDP::Open(PIPSocket::Address _localAddress,
                   WORD portBase, WORD portMax,
                   BYTE tos,
                   PSTUNClient * stun,
                   RTP_QOS * rtpQos)
{
  // save local address 
  localAddress = _localAddress;

  localDataPort    = (WORD)(portBase&0xfffe);
  localControlPort = (WORD)(localDataPort + 1);

  delete dataSocket;
  delete controlSocket;
  dataSocket = NULL;
  controlSocket = NULL;

  PQoS * dataQos = NULL;
  PQoS * ctrlQos = NULL;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -