📄 rtp.cxx
字号:
}void RTP_Session::OnRxSourceDescription(const SourceDescriptionArray & PTRACE_description){#if PTRACING for (PINDEX i = 0; i < PTRACE_description.GetSize(); i++) PTRACE(3, "RTP\tOnSourceDescription: " << PTRACE_description[i]);#endif}void RTP_Session::OnRxGoodbye(const PDWORDArray & PTRACE_src, const PString & PTRACE_reason){ PTRACE(3, "RTP\tOnGoodbye: \"" << PTRACE_reason << "\" srcs=" << PTRACE_src);}void RTP_Session::OnRxApplDefined(const PString & PTRACE_type, unsigned PTRACE_subtype, DWORD PTRACE_src, const BYTE * /*data*/, PINDEX PTRACE_size){ PTRACE(3, "RTP\tOnApplDefined: \"" << PTRACE_type << "\"-" << PTRACE_subtype << " " << PTRACE_src << " [" << PTRACE_size << ']');}void RTP_Session::ReceiverReport::PrintOn(ostream & strm) const{ strm << "ssrc=" << sourceIdentifier << " fraction=" << fractionLost << " lost=" << totalLost << " last_seq=" << lastSequenceNumber << " jitter=" << jitter << " lsr=" << lastTimestamp << " dlsr=" << delay;}void RTP_Session::SenderReport::PrintOn(ostream & strm) const{ strm << "ssrc=" << sourceIdentifier << " ntp=" << realTimestamp.AsString("yyyy/M/d-h:m:s.uuuu") << " rtp=" << rtpTimestamp << " psent=" << packetsSent << " osent=" << octetsSent;}void RTP_Session::SourceDescription::PrintOn(ostream & strm) const{ static const char * const DescriptionNames[RTP_ControlFrame::NumDescriptionTypes] = { "END", "CNAME", "NAME", "EMAIL", "PHONE", "LOC", "TOOL", "NOTE", "PRIV" }; strm << "ssrc=" << sourceIdentifier; for (PINDEX i = 0; i < items.GetSize(); i++) { strm << "\n item[" << i << "]: type="; unsigned typeNum = items.GetKeyAt(i); if (typeNum < PARRAYSIZE(DescriptionNames)) strm << DescriptionNames[typeNum]; else strm << typeNum; strm << " data=\"" << items.GetDataAt(i) << '"'; }}DWORD RTP_Session::GetPacketsTooLate() const{ return jitter != NULL ? jitter->GetPacketsTooLate() : 0;}/////////////////////////////////////////////////////////////////////////////RTP_Session * RTP_SessionManager::UseSession(unsigned sessionID){ mutex.Wait(); RTP_Session * session = sessions.GetAt(sessionID); if (session == NULL) return NULL; PTRACE(3, "RTP\tFound existing session " << sessionID); session->IncrementReference(); mutex.Signal(); return session;}void RTP_SessionManager::AddSession(RTP_Session * session){ PAssertNULL(session); PTRACE(2, "RTP\tAdding session " << *session); sessions.SetAt(session->GetSessionID(), session); mutex.Signal();}void RTP_SessionManager::ReleaseSession(unsigned sessionID){ PTRACE(2, "RTP\tReleasing session " << sessionID); mutex.Wait(); if (sessions.Contains(sessionID)) { if (sessions[sessionID].DecrementReference()) { PTRACE(3, "RTP\tDeleting session " << sessionID); sessions.SetAt(sessionID, NULL); } } mutex.Signal();}RTP_Session * RTP_SessionManager::GetSession(unsigned sessionID) const{ PWaitAndSignal wait(((RTP_SessionManager*)this)->mutex); if (!sessions.Contains(sessionID)) return NULL; PTRACE(3, "RTP\tFound existing session " << sessionID); return &sessions[sessionID];}/////////////////////////////////////////////////////////////////////////////static void SetMinBufferSize(PUDPSocket & sock, int buftype){ int sz = 0; if (sock.GetOption(buftype, sz)) { if (sz >= UDP_BUFFER_SIZE) return; } if (!sock.SetOption(buftype, UDP_BUFFER_SIZE)) { PTRACE(1, "RTP_UDP\tSetOption(" << buftype << ") failed: " << sock.GetErrorText()); }}RTP_UDP::RTP_UDP(unsigned id) : RTP_Session(id){ remoteDataPort = 0; remoteControlPort = 0; shutdownRead = FALSE; shutdownWrite = FALSE;}RTP_UDP::~RTP_UDP(){ Close(TRUE); Close(FALSE);}BOOL RTP_UDP::Open(PIPSocket::Address localAddress, WORD portBase, BYTE tos){ WORD nextPort = (WORD)(portBase&0xfffe); while (!dataSocket.Listen(localAddress, 1, nextPort) || !controlSocket.Listen(localAddress, 1, (WORD)(nextPort+1))) { dataSocket.Close(); controlSocket.Close(); if (nextPort > 0xfffd) return FALSE; // If it ever gets to here the OS has some SERIOUS problems! nextPort += 2; }#ifndef __BEOS__ // Set the IP Type Of Service field for prioritisation of media UDP packets // through some Cisco routers and Linux boxes if (!dataSocket.SetOption(IP_TOS, tos, IPPROTO_IP)) { PTRACE(1, "RTP_UDP\tCould not set TOS field in IP header: " << dataSocket.GetErrorText()); } // Increase internal buffer size on media UDP sockets SetMinBufferSize(dataSocket, SO_RCVBUF); SetMinBufferSize(dataSocket, SO_SNDBUF); SetMinBufferSize(controlSocket, SO_RCVBUF); SetMinBufferSize(controlSocket, SO_SNDBUF);#endif shutdownRead = FALSE; shutdownWrite = FALSE; PTRACE(2, "RTP_UDP\tSession " << sessionID << " created: data=" << dataSocket.GetPort() << " control=" << controlSocket.GetPort() << " ssrc=" << syncSourceOut); return TRUE;}void RTP_UDP::Close(BOOL reading){ if (reading) { if (!shutdownRead) { PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Shutting down read."); shutdownRead = TRUE; PIPSocket::Address addr; controlSocket.GetLocalAddress(addr); dataSocket.WriteTo("", 1, addr, controlSocket.GetPort()); } } else { PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Shutting down write."); shutdownWrite = TRUE; }}PString RTP_UDP::GetLocalHostName(){ return dataSocket.GetLocalHostName();}BOOL RTP_UDP::SetRemoteSocketInfo(PIPSocket::Address address, WORD port, BOOL isDataPort){ remoteAddress = address; if (isDataPort) { if (remoteControlPort == 0) remoteControlPort = (WORD)(port + 1); remoteDataPort = port; } else { if (remoteDataPort == 0) remoteDataPort = (WORD)(port - 1); remoteControlPort = port; } return remoteAddress != 0 && port != 0;}BOOL RTP_UDP::ReadData(RTP_DataFrame & frame){ for (;;) { int selectStatus = PSocket::Select(dataSocket, controlSocket); if (shutdownRead) { PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Read shutdown."); shutdownRead = FALSE; return FALSE; } switch (selectStatus) { case -2 : if (ReadControlPDU() == e_AbortTransport) return FALSE; break; case -3 : if (ReadControlPDU() == e_AbortTransport) return FALSE; // Then do -1 case case -1 : switch (ReadDataPDU(frame)) { case e_ProcessPacket : return TRUE; case e_IgnorePacket : break; case e_AbortTransport : return FALSE; } break; case PSocket::Interrupted: PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Interrupted."); return FALSE; default : PTRACE(1, "RTP_UDP\tSession " << sessionID << ", Select error: " << PChannel::GetErrorText((PChannel::Errors)selectStatus)); return FALSE; } }}static RTP_Session::SendReceiveStatus ReadSocket(PUDPSocket & socket, PBYTEArray & frame, const char * PTRACE_name, PIPSocket::Address remoteAddress, WORD PTRACE_port){ PIPSocket::Address addr; WORD port; if (!socket.ReadFrom(frame.GetPointer(), frame.GetSize(), addr, port)) { switch (socket.GetErrorNumber()) { case ECONNRESET : case ECONNREFUSED : PTRACE(2, "RTP_UDP\t" << PTRACE_name << " port on remote not ready."); return RTP_Session::e_IgnorePacket; default: PTRACE(1, "RTP_UDP\t" << PTRACE_name << " read error: " << socket.GetErrorText()); return RTP_Session::e_AbortTransport; } } if (addr != remoteAddress) { PTRACE(1, "RTP_UDP\t" << PTRACE_name << " PDU not from correct host or port: is " << addr << ':' << port << ", should be " << remoteAddress << ':' << PTRACE_port); return RTP_Session::e_IgnorePacket; } return RTP_Session::e_ProcessPacket;}RTP_Session::SendReceiveStatus RTP_UDP::ReadDataPDU(RTP_DataFrame & frame){ RTP_Session::SendReceiveStatus status = ReadSocket(dataSocket, frame, "Data", remoteAddress, remoteDataPort); if (status != e_ProcessPacket) return status; // Check received PDU is big enough PINDEX pduSize = dataSocket.GetLastReadCount(); if (pduSize < RTP_DataFrame::MinHeaderSize || pduSize < frame.GetHeaderSize()) { PTRACE(2, "RTP_UDP\tSession " << sessionID << ", Received data packet too small: " << pduSize << " bytes"); return e_IgnorePacket; } frame.SetPayloadSize(pduSize - frame.GetHeaderSize()); return OnReceiveData(frame);}RTP_Session::SendReceiveStatus RTP_UDP::ReadControlPDU(){ RTP_ControlFrame frame(2048); RTP_Session::SendReceiveStatus status = ReadSocket(controlSocket, frame, "Control", remoteAddress, remoteControlPort); if (status != e_ProcessPacket) return status; PINDEX pduSize = controlSocket.GetLastReadCount(); if (pduSize < 4 || pduSize < 4+frame.GetPayloadSize()) { PTRACE(2, "RTP_UDP\tSession " << sessionID << ", Received control packet too small: " << pduSize << " bytes"); return e_IgnorePacket; } return OnReceiveControl(frame);}BOOL RTP_UDP::WriteData(RTP_DataFrame & frame){ if (shutdownWrite) { PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Write shutdown."); shutdownWrite = FALSE; return FALSE; } switch (OnSendData(frame)) { case e_ProcessPacket : break; case e_IgnorePacket : return TRUE; case e_AbortTransport : return FALSE; } if (dataSocket.WriteTo(frame.GetPointer(), frame.GetHeaderSize()+frame.GetPayloadSize(), remoteAddress, remoteDataPort)) return TRUE; PTRACE(1, "RTP_UDP\tSession " << sessionID << ", Write error on data port: " << dataSocket.GetErrorText()); return FALSE;}BOOL RTP_UDP::WriteControl(RTP_ControlFrame & frame){ if (controlSocket.WriteTo(frame.GetPointer(), frame.GetPayloadSize()+4, remoteAddress, remoteControlPort)) return TRUE; PTRACE(1, "RTP_UDP\tSession " << sessionID << ", Write error on control port: " << controlSocket.GetErrorText()); return FALSE;}/////////////////////////////////////////////////////////////////////////////
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -