📄 rtp.cxx
字号:
syncSourceIn = 0; senderReportInterval = 100; // Number of data packets between tx reports receiverReportInterval = 100; // Number of data packets between rx reports packetsSent = 0; octetsSent = 0; packetsReceived = 0; octetsReceived = 0; packetsLost = 0; packetsOutOfOrder = 0; averageSendTime = 0; maximumSendTime = 0; minimumSendTime = 0; averageReceiveTime = 0; maximumReceiveTime = 0; minimumReceiveTime = 0; senderReportCount = 0; receiverReportCount = 0; averageSendTimeAccum = 0; maximumSendTimeAccum = 0; minimumSendTimeAccum = 0xffffffff; averageReceiveTimeAccum = 0; maximumReceiveTimeAccum = 0; minimumReceiveTimeAccum = 0xffffffff; jitterLevel = 0;}RTP_Session::~RTP_Session(){ delete userData; delete jitter;}void RTP_Session::SetUserData(RTP_UserData * data){ delete userData; userData = data;}void RTP_Session::SetJitterBufferSize(unsigned jitterDelay){ PAssert(jitterDelay > 0, PInvalidParameter); if (jitter != NULL) jitter->SetDelay(jitterDelay); else { SetIgnoreOutOfOrderPackets(FALSE); jitter = new RTP_JitterBuffer(*this, jitterDelay); }}BOOL RTP_Session::ReadBufferedData(DWORD timestamp, RTP_DataFrame & frame){ if (jitter != NULL) return jitter->ReadData(timestamp, frame); else return ReadData(frame);}void RTP_Session::SetSenderReportInterval(unsigned packets){ senderReportInterval = PMAX(packets, 2); senderReportCount = 0; averageSendTimeAccum = 0; maximumSendTimeAccum = 0; minimumSendTimeAccum = 0xffffffff;}void RTP_Session::SetReceiverReportInterval(unsigned packets){ receiverReportInterval = PMAX(packets, 2); receiverReportCount = 0; averageReceiveTimeAccum = 0; maximumReceiveTimeAccum = 0; minimumReceiveTimeAccum = 0xffffffff;}void RTP_Session::SetReceiverReport(RTP_ControlFrame::ReceiverReport & receiver){ receiver.ssrc = syncSourceIn; receiver.fraction = (BYTE)(256*packetsLost/packetsReceived); receiver.SetLostPackets(packetsLost); receiver.last_seq = lastRRSequenceNumber; lastRRSequenceNumber = expectedSequenceNumber; // The following have not been calculated yet. receiver.jitter = 0; receiver.lsr = 0; receiver.dlsr = 0; PTRACE(3, "RTP\tSentReceiverReport:" " ssrc=" << receiver.ssrc << " fraction=" << (unsigned)receiver.fraction << " lost=" << receiver.GetLostPackets() << " last_seq=" << receiver.last_seq << " jitter=" << receiver.jitter << " lsr=" << receiver.lsr << " dlsr=" << receiver.dlsr);}RTP_Session::SendReceiveStatus RTP_Session::OnSendData(RTP_DataFrame & frame){ PTimeInterval tick = PTimer::Tick(); // Timestamp set now frame.SetSequenceNumber(++lastSentSequenceNumber); frame.SetSyncSource(syncSourceOut); if (packetsSent == 0) { // First packet sent PString description = PProcess::Current().GetUserName() + '@' + GetLocalHostName(); PTRACE(2, "RTP\tSending SDES: " << description); RTP_ControlFrame pdu; RTP_ControlFrame::SourceDescription & sdes = pdu.AddSourceDescription(syncSourceOut); pdu.AddSourceDescriptionItem(sdes, RTP_ControlFrame::e_CNAME, description); pdu.AddSourceDescriptionItem(sdes, RTP_ControlFrame::e_TOOL, PProcess::Current().GetName()); if (!WriteControl(pdu)) return e_AbortTransport; } else if (!frame.GetMarker()) { // Only do statistics on subsequent packets DWORD diff = (tick - lastSentPacketTime).GetInterval(); averageSendTimeAccum += diff; if (diff > maximumSendTimeAccum) maximumSendTimeAccum = diff; if (diff < minimumSendTimeAccum) minimumSendTimeAccum = diff; senderReportCount++; } lastSentPacketTime = tick; octetsSent += frame.GetPayloadSize(); packetsSent++; if (senderReportCount < senderReportInterval) return e_ProcessPacket; senderReportCount = 0; averageSendTime = averageSendTimeAccum/senderReportInterval; maximumSendTime = maximumSendTimeAccum; minimumSendTime = minimumSendTimeAccum; averageSendTimeAccum = 0; maximumSendTimeAccum = 0; minimumSendTimeAccum = 0xffffffff; if (userData != NULL) userData->OnTxStatistics(*this); RTP_ControlFrame report; report.SetPayloadType(RTP_ControlFrame::e_SenderReport); report.SetPayloadSize(sizeof(RTP_ControlFrame::SenderReport)); RTP_ControlFrame::SenderReport * sender = (RTP_ControlFrame::SenderReport *)report.GetPayloadPtr(); sender->ssrc = syncSourceOut; PTime now; sender->ntp_sec = now.GetTimeInSeconds()+SecondsFrom1900to1970; // Convert from 1970 to 1900 sender->ntp_frac = now.GetMicrosecond()*4294; // Scale microseconds to "fraction" from 0 to 2^32 sender->rtp_ts = lastSentSequenceNumber; sender->psent = packetsSent; sender->osent = octetsSent; PTRACE(3, "RTP\tSentSenderReport: " " ssrc=" << sender->ssrc << " ntp=" << sender->ntp_sec << '.' << sender->ntp_frac << " rtp=" << sender->rtp_ts << " psent=" << sender->psent << " osent=" << sender->osent); if (syncSourceIn != 0) { report.SetPayloadSize(sizeof(RTP_ControlFrame::SenderReport) + sizeof(RTP_ControlFrame::ReceiverReport)); report.SetCount(1); SetReceiverReport(*(RTP_ControlFrame::ReceiverReport *)&sender[1]); } return WriteControl(report) ? e_ProcessPacket : e_AbortTransport;}RTP_Session::SendReceiveStatus RTP_Session::OnReceiveData(const RTP_DataFrame & frame){ // Check that the PDU is the right version if (frame.GetVersion() != RTP_DataFrame::ProtocolVersion) return e_IgnorePacket; // Non fatal error, just ignore // Check for if a control packet rather than data packet. if (frame.GetPayloadType() > RTP_DataFrame::MaxPayloadType) return e_IgnorePacket; // Non fatal error, just ignore PTimeInterval tick = PTimer::Tick(); // Get timestamp now // Check packet sequence numbers if (expectedSequenceNumber == 0 || frame.GetSyncSource() != syncSourceIn) { expectedSequenceNumber = (WORD)(frame.GetSequenceNumber() + 1); syncSourceIn = frame.GetSyncSource(); PTRACE(3, "RTP\tFirst data:" " ver=" << frame.GetVersion() << " pt=" << frame.GetPayloadType() << " psz=" << frame.GetPayloadSize() << " m=" << frame.GetMarker() << " x=" << frame.GetExtension() << " seq=" << frame.GetSequenceNumber() << " ts=" << frame.GetTimestamp() << " src=" << frame.GetSyncSource() << " ccnt=" << frame.GetContribSrcCount()); } else { WORD sequenceNumber = frame.GetSequenceNumber(); if (sequenceNumber == expectedSequenceNumber) expectedSequenceNumber++; else if (sequenceNumber < expectedSequenceNumber) { PTRACE(3, "RTP\tOut of order packet, received " << sequenceNumber << " expected " << expectedSequenceNumber << " ssrc=" << syncSourceIn); packetsOutOfOrder++; if (ignoreOutOfOrderPackets) return e_IgnorePacket; // Non fatal error, just ignore } else { unsigned dropped = sequenceNumber - expectedSequenceNumber; packetsLost += dropped; PTRACE(3, "RTP\t" << dropped << " packets dropped, ssrc=" << syncSourceIn); expectedSequenceNumber = (WORD)(sequenceNumber + 1); } // Only do statistics on packets after first received in talk burst if (!frame.GetMarker()) { DWORD diff = (tick - lastReceivedPacketTime).GetInterval(); averageReceiveTimeAccum += diff; if (diff > maximumReceiveTimeAccum) maximumReceiveTimeAccum = diff; if (diff < minimumReceiveTimeAccum) minimumReceiveTimeAccum = diff; receiverReportCount++; } } lastReceivedPacketTime = tick; octetsReceived += frame.GetPayloadSize(); packetsReceived++; if (receiverReportCount < receiverReportInterval) return e_ProcessPacket; receiverReportCount = 0; averageReceiveTime = averageReceiveTimeAccum/receiverReportInterval; maximumReceiveTime = maximumReceiveTimeAccum; minimumReceiveTime = minimumReceiveTimeAccum; jitterLevel = maximumReceiveTime - minimumReceiveTime; averageReceiveTimeAccum = 0; maximumReceiveTimeAccum = 0; minimumReceiveTimeAccum = 0xffffffff; if (userData != NULL) userData->OnRxStatistics(*this); // If sending data, RR is sent in an SR if (packetsSent != 0) return e_ProcessPacket; // Send RR as we are not transmitting RTP_ControlFrame report; report.SetPayloadType(RTP_ControlFrame::e_ReceiverReport); report.SetPayloadSize(4+sizeof(RTP_ControlFrame::ReceiverReport)); report.SetCount(1); PUInt32b * payload = (PUInt32b *)report.GetPayloadPtr(); *payload = syncSourceOut; SetReceiverReport(*(RTP_ControlFrame::ReceiverReport *)&payload[1]); return WriteControl(report) ? e_ProcessPacket : e_AbortTransport;}static RTP_Session::ReceiverReportArray BuildReceiverReportArray(const RTP_ControlFrame & frame, PINDEX offset){ RTP_Session::ReceiverReportArray reports; const RTP_ControlFrame::ReceiverReport * rr = (const RTP_ControlFrame::ReceiverReport *)(frame.GetPayloadPtr()+offset); for (PINDEX repIdx = 0; repIdx < (PINDEX)frame.GetCount(); repIdx++) { RTP_Session::ReceiverReport * report = new RTP_Session::ReceiverReport; report->sourceIdentifier = rr->ssrc; report->fractionLost = rr->fraction; report->totalLost = rr->GetLostPackets(); report->lastSequenceNumber = rr->last_seq; report->jitter = rr->jitter; report->lastTimestamp = (PInt64)(DWORD)rr->lsr; report->delay = ((PInt64)rr->dlsr << 16)/1000; reports.SetAt(repIdx, report); rr++; } return reports;}RTP_Session::SendReceiveStatus RTP_Session::OnReceiveControl(const RTP_ControlFrame & frame){ BYTE * payload = frame.GetPayloadPtr(); unsigned size = frame.GetPayloadSize(); switch (frame.GetPayloadType()) { case RTP_ControlFrame::e_SenderReport : if (size < sizeof(RTP_ControlFrame::SenderReport)) PTRACE(2, "RTP\tSenderReport packet truncated"); else { SenderReport sender; const RTP_ControlFrame::SenderReport & sr = *(const RTP_ControlFrame::SenderReport *)payload; sender.sourceIdentifier = sr.ssrc; sender.realTimestamp = PTime(sr.ntp_sec-SecondsFrom1900to1970, sr.ntp_frac/4294); sender.rtpTimestamp = sr.rtp_ts; sender.packetsSent = sr.psent; sender.octetsSent = sr.osent; OnRxSenderReport(sender, BuildReceiverReportArray(frame, sizeof(RTP_ControlFrame::SenderReport))); } break; case RTP_ControlFrame::e_ReceiverReport : if (size < 4) PTRACE(2, "RTP\tReceiverReport packet truncated"); else OnRxReceiverReport(*(const PUInt32b *)payload, BuildReceiverReportArray(frame, sizeof(PUInt32b))); break; case RTP_ControlFrame::e_SourceDescription : if (size < frame.GetCount()*sizeof(RTP_ControlFrame::SourceDescription)) PTRACE(2, "RTP\tSourceDescription packet truncated"); else { SourceDescriptionArray descriptions; const RTP_ControlFrame::SourceDescription * sdes = (const RTP_ControlFrame::SourceDescription *)payload; for (PINDEX srcIdx = 0; srcIdx < (PINDEX)frame.GetCount(); srcIdx++) { descriptions.SetAt(srcIdx, new SourceDescription(sdes->src)); const RTP_ControlFrame::SourceDescription::Item * item = sdes->item; while (item->type != RTP_ControlFrame::e_END) { descriptions[srcIdx].items.SetAt(item->type, PString(item->data, item->length)); item = item->GetNextItem(); } sdes = (const RTP_ControlFrame::SourceDescription *)item->GetNextItem(); } OnRxSourceDescription(descriptions); } break; case RTP_ControlFrame::e_Goodbye : if (size < 4) PTRACE(2, "RTP\tGoodbyte packet truncated"); else { PString str; unsigned count = frame.GetCount()*4; if (size > count) str = PString((const char *)(payload+count+4), *payload); PDWORDArray sources(count); for (PINDEX i = 0; i < (PINDEX)count; i++) sources[i] = ((const PUInt32b *)payload)[i]; OnRxGoodbye(sources, str); } break; case RTP_ControlFrame::e_ApplDefined : if (size < 4) PTRACE(2, "RTP\tApplDefined packet truncated"); else { PString str((const char *)(payload+4), 4); OnRxApplDefined(str, frame.GetCount(), *(const PUInt32b *)payload, payload+8, frame.GetPayloadSize()-8); } break; default : PTRACE(2, "RTP\tUnknown control payload type: " << frame.GetPayloadType()); } return e_ProcessPacket;}void RTP_Session::OnRxSenderReport(const SenderReport & PTRACE_sender, const ReceiverReportArray & PTRACE_reports){#if PTRACING PTRACE(3, "RTP\tOnRxSenderReport: " << PTRACE_sender); for (PINDEX i = 0; i < PTRACE_reports.GetSize(); i++) PTRACE(3, "RTP\tOnRxSenderReport RR: " << PTRACE_reports[i]);#endif}void RTP_Session::OnRxReceiverReport(DWORD PTRACE_src, const ReceiverReportArray & PTRACE_reports){#if PTRACING PTRACE(3, "RTP\tOnReceiverReport: ssrc=" << PTRACE_src); for (PINDEX i = 0; i < PTRACE_reports.GetSize(); i++) PTRACE(3, "RTP\tOnReceiverReport RR: " << PTRACE_reports[i]);#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -