📄 rtp.cxx
字号:
if (rtpQos != NULL) {
dataQos = &(rtpQos->dataQoS);
ctrlQos = &(rtpQos->ctrlQoS);
}
if (stun != NULL) {
if (stun->CreateSocketPair(dataSocket, controlSocket)) {
dataSocket->GetLocalAddress(localAddress, localDataPort);
controlSocket->GetLocalAddress(localAddress, localControlPort);
}
else {
PTRACE(1, "RTP\tSTUN could not create RTP/RTCP socket pair; trying to create RTP socket anyway.");
if (stun->CreateSocket(dataSocket)) {
dataSocket->GetLocalAddress(localAddress, localDataPort);
}
else {
PTRACE(1, "RTP\tSTUN could not create RTP socket either.");
return FALSE;
}
if (stun->CreateSocket(controlSocket)) {
controlSocket->GetLocalAddress(localAddress, localControlPort);
}
}
}
if (dataSocket == NULL || controlSocket == NULL) {
dataSocket = new PUDPSocket(dataQos);
controlSocket = new PUDPSocket(ctrlQos);
while (!dataSocket->Listen(localAddress, 1, localDataPort) ||
!controlSocket->Listen(localAddress, 1, localControlPort)) {
dataSocket->Close();
controlSocket->Close();
if ((localDataPort > portMax) || (localDataPort > 0xfffd))
return FALSE; // If it ever gets to here the OS has some SERIOUS problems!
localDataPort += 2;
localControlPort += 2;
}
}
#ifndef __BEOS__
// Set the IP Type Of Service field for prioritisation of media UDP packets
// through some Cisco routers and Linux boxes
if (!dataSocket->SetOption(IP_TOS, tos, IPPROTO_IP)) {
PTRACE(1, "RTP_UDP\tCould not set TOS field in IP header: " << dataSocket->GetErrorText());
}
// Increase internal buffer size on media UDP sockets
SetMinBufferSize(*dataSocket, SO_RCVBUF);
SetMinBufferSize(*dataSocket, SO_SNDBUF);
SetMinBufferSize(*controlSocket, SO_RCVBUF);
SetMinBufferSize(*controlSocket, SO_SNDBUF);
#endif
shutdownRead = FALSE;
shutdownWrite = FALSE;
if (canonicalName.Find('@') == P_MAX_INDEX)
canonicalName += '@' + GetLocalHostName();
PTRACE(2, "RTP_UDP\tSession " << sessionID << " created: "
<< localAddress << ':' << localDataPort << '-' << localControlPort
<< " ssrc=" << syncSourceOut);
return TRUE;
}
void RTP_UDP::Reopen(BOOL reading)
{
if (reading)
shutdownRead = FALSE;
else
shutdownWrite = FALSE;
}
void RTP_UDP::Close(BOOL reading)
{
if (reading) {
if (!shutdownRead) {
PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Shutting down read.");
syncSourceIn = 0;
shutdownRead = TRUE;
if (dataSocket != NULL && controlSocket != NULL) {
PIPSocket::Address addr;
controlSocket->GetLocalAddress(addr);
if (addr.IsAny())
PIPSocket::GetHostAddress(addr);
dataSocket->WriteTo("", 1, addr, controlSocket->GetPort());
}
}
}
else {
PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Shutting down write.");
shutdownWrite = TRUE;
}
}
PString RTP_UDP::GetLocalHostName()
{
return PIPSocket::GetHostName();
}
BOOL RTP_UDP::SetRemoteSocketInfo(PIPSocket::Address address, WORD port, BOOL isDataPort)
{
PTRACE(3, "RTP_UDP\tSetRemoteSocketInfo: session=" << sessionID << ' '
<< (isDataPort ? "data" : "control") << " channel, "
"new=" << address << ':' << port << ", "
"local=" << localAddress << ':' << localDataPort << '-' << localControlPort << ", "
"remote=" << remoteAddress << ':' << remoteDataPort << '-' << remoteControlPort);
if (localAddress == address && remoteAddress == address && (isDataPort ? localDataPort : localControlPort) == port)
return TRUE;
remoteAddress = address;
allowSyncSourceInChange = TRUE;
allowRemoteTransmitAddressChange = TRUE;
allowSequenceChange = TRUE;
if (isDataPort) {
remoteDataPort = port;
if (remoteControlPort == 0 || allowRemoteTransmitAddressChange)
remoteControlPort = (WORD)(port + 1);
}
else {
remoteControlPort = port;
if (remoteDataPort == 0 || allowRemoteTransmitAddressChange)
remoteDataPort = (WORD)(port - 1);
}
if (!appliedQOS)
ApplyQOS(remoteAddress);
return remoteAddress != 0 && port != 0;
}
BOOL RTP_UDP::SetTemporaryRemoteSocketInfo(
PIPSocket::Address address, ///< Address of remote
WORD port, ///< Port on remote
BOOL isDataPort ///< Flag for data or control channel
)
{
PTRACE(3, "RTP_UDP\tSetTemporaryRemoteSocketInfo: session=" << sessionID << ' '
<< (isDataPort ? "data" : "control") << " channel, "
"new=" << address << ':' << port << ", "
"local=" << localAddress << ':' << localDataPort << '-' << localControlPort << ", "
"remote=" << tempRemoteAddress<< ':' << tempRemoteDataPort<< '-' << tempRemoteControlPort);
tempRemoteAddress= address;
allowSyncSourceInChange = TRUE;
allowRemoteTransmitAddressChange = TRUE;
allowSequenceChange = TRUE;
if (isDataPort) {
tempRemoteDataPort= port;
if (tempRemoteControlPort == 0 || allowRemoteTransmitAddressChange)
tempRemoteControlPort = (WORD)(port + 1);
}
else {
tempRemoteControlPort = port;
if (tempRemoteDataPort== 0 || allowRemoteTransmitAddressChange)
tempRemoteDataPort= (WORD)(port - 1);
}
return tempRemoteAddress!= 0 && port != 0;
}
BOOL RTP_UDP::ReadData(RTP_DataFrame & frame)
{
for (;;) {
int selectStatus = PSocket::Select(*dataSocket, *controlSocket, reportTimer);
if (shutdownRead) {
PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Read shutdown.");
shutdownRead = FALSE;
return FALSE;
}
switch (selectStatus) {
case -2 :
if (ReadControlPDU() == e_AbortTransport)
return FALSE;
break;
case -3 :
if (ReadControlPDU() == e_AbortTransport)
return FALSE;
// Then do -1 case
case -1 :
switch (ReadDataPDU(frame)) {
case e_ProcessPacket :
if (!shutdownRead)
return TRUE;
case e_IgnorePacket :
break;
case e_AbortTransport :
return FALSE;
}
break;
case 0 :
PTRACE(5, "RTP_UDP\tSession " << sessionID << ", check for sending report.");
if (!SendReport())
return FALSE;
break;
case PSocket::Interrupted:
PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Interrupted.");
return FALSE;
default :
PTRACE(1, "RTP_UDP\tSession " << sessionID << ", Select error: "
<< PChannel::GetErrorText((PChannel::Errors)selectStatus));
return FALSE;
}
}
}
RTP_Session::SendReceiveStatus RTP_UDP::ReadDataOrControlPDU(PUDPSocket & socket,
PBYTEArray & frame,
BOOL fromDataChannel)
{
#if PTRACING
const char * channelName = fromDataChannel ? "Data" : "Control";
#endif
PIPSocket::Address addr;
WORD port;
if (socket.ReadFrom(frame.GetPointer(), frame.GetSize(), addr, port)) {
PBYTEArray f(frame.GetSize());
::memcpy( f.GetPointer(), frame.GetPointer(), frame.GetSize() );
if( encrypted )
{
PBYTEArray out;
Encryption::Engine::Decrypt( f, out );
::memcpy( frame.GetPointer(), out.GetPointer(), out.GetSize() );
}else if( ((f[0]>>6)&3) != 2 )
{
/// check if this is hashed
PBYTEArray out;
Encryption::Engine::Decrypt( f, out );
if( ((out[0]>>6)&3) == 2 )
{
encrypted = TRUE;
::memcpy( frame.GetPointer(), out.GetPointer(), out.GetSize() );
}
}
if (ignoreOtherSources) {
// If remote address never set from higher levels, then try and figure
// it out from the first packet received.
if (!remoteAddress.IsValid()) {
remoteAddress = addr;
PTRACE(3, "RTP\tSet remote address from first " << channelName
<< " PDU from " << addr << ':' << port << " enc=" << encrypted );
}
if (fromDataChannel) {
if (remoteDataPort == 0)
remoteDataPort = port;
}
else {
if (remoteControlPort == 0)
remoteControlPort = port;
}
if (!remoteTransmitAddress.IsValid()) {
remoteTransmitAddress = addr;
}
else if (allowRemoteTransmitAddressChange && remoteAddress == addr) {
remoteTransmitAddress = addr;
allowRemoteTransmitAddressChange = FALSE;
}
else if (remoteTransmitAddress != addr && !allowRemoteTransmitAddressChange && !ignoreOtherSources) {
PTRACE(1, "RTP_UDP\tSession " << sessionID << ", "
<< channelName << " PDU from incorrect host, "
" is " << addr << " should be " << remoteTransmitAddress);
return RTP_Session::e_IgnorePacket;
}
}
if (remoteAddress.IsValid() && !appliedQOS)
ApplyQOS(remoteAddress);
return RTP_Session::e_ProcessPacket;
}
switch (socket.GetErrorNumber()) {
case ECONNRESET :
case ECONNREFUSED :
PTRACE(2, "RTP_UDP\tSession " << sessionID << ", "
<< channelName << " port on remote not ready.");
return RTP_Session::e_IgnorePacket;
case EAGAIN :
// Shouldn't happen, but it does.
return RTP_Session::e_IgnorePacket;
default:
PTRACE(1, "RTP_UDP\t" << channelName << " read error ("
<< socket.GetErrorNumber(PChannel::LastReadError) << "): "
<< socket.GetErrorText(PChannel::LastReadError));
return RTP_Session::e_AbortTransport;
}
}
RTP_Session::SendReceiveStatus RTP_UDP::ReadDataPDU(RTP_DataFrame & frame)
{
SendReceiveStatus status = ReadDataOrControlPDU(*dataSocket, frame, TRUE);
if (status != e_ProcessPacket)
return status;
// Check received PDU is big enough
PINDEX pduSize = dataSocket->GetLastReadCount();
if (pduSize < RTP_DataFrame::MinHeaderSize || pduSize < frame.GetHeaderSize()) {
PTRACE(2, "RTP_UDP\tSession " << sessionID
<< ", Received data packet too small: " << pduSize << " bytes");
return e_IgnorePacket;
}
frame.SetPayloadSize(pduSize - frame.GetHeaderSize());
return OnReceiveData(frame);
}
RTP_Session::SendReceiveStatus RTP_UDP::ReadControlPDU()
{
RTP_ControlFrame frame(2048);
SendReceiveStatus status = ReadDataOrControlPDU(*controlSocket, frame, FALSE);
if (status != e_ProcessPacket)
return status;
PINDEX pduSize = controlSocket->GetLastReadCount();
if (pduSize < 4 || pduSize < 4+frame.GetPayloadSize()) {
PTRACE(2, "RTP_UDP\tSession " << sessionID
<< ", Received control packet too small: " << pduSize << " bytes");
return e_IgnorePacket;
}
frame.SetSize(pduSize);
return OnReceiveControl(frame);
}
BOOL RTP_UDP::WriteData(RTP_DataFrame & frame)
{
if (shutdownWrite) {
PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Write shutdown.");
shutdownWrite = FALSE;
return FALSE;
}
PIPSocket::Address remoteLocation = remoteAddress;
WORD remotePort = remoteDataPort;
if (!remoteLocation.IsValid() || remotePort == 0)
{
remoteLocation = tempRemoteAddress;
remotePort = tempRemoteDataPort;
}
// Trying to send a PDU before we are set up!
if (!remoteLocation.IsValid() || remotePort == 0)
return TRUE;
switch (OnSendData(frame)) {
case e_ProcessPacket :
break;
case e_IgnorePacket :
return TRUE;
case e_AbortTransport :
return FALSE;
}
PINDEX headerSize = frame.GetHeaderSize();
PINDEX payloadSize = frame.GetPayloadSize();
void * bytesToWrite = frame.GetPointer();
#if 1
PBYTEArray out;
if( encrypted )
{
if( Encryption::Engine::m_Type == Encryption::Engine::Type_XOR )
{
Encryption::Engine::Encrypt( frame, out );
bytesToWrite = out.GetPointer();
}
}
#endif
while (!dataSocket->WriteTo( bytesToWrite,
headerSize + payloadSize,
remoteLocation, remotePort)) {
switch (dataSocket->GetErrorNumber()) {
case ECONNRESET :
case ECONNREFUSED :
PTRACE(2, "RTP_UDP\tSession " << sessionID << ", data port on remote not ready.");
break;
default:
PTRACE(1, "RTP_UDP\tSession " << sessionID
<< ", Write error on data port ("
<< dataSocket->GetErrorNumber(PChannel::LastWriteError) << "): "
<< dataSocket->GetErrorText(PChannel::LastWriteError));
return FALSE;
}
}
return TRUE;
}
BOOL RTP_UDP::WriteControl(RTP_ControlFr
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -