📄 relayport.cc
字号:
} options_.push_back(OptionValue(opt, value)); return result;}int RelayPort::GetError() { return error_;}void RelayPort::OnReadPacket( const char* data, size_t size, const talk_base::SocketAddress& remote_addr) { if (Connection* conn = GetConnection(remote_addr)) { conn->OnReadPacket(data, size); } else { Port::OnReadPacket(data, size, remote_addr); }}void RelayPort::DisposeSocket(talk_base::AsyncPacketSocket * socket) { thread_->Dispose(socket);}RelayEntry::RelayEntry(RelayPort* port, const talk_base::SocketAddress& ext_addr, const talk_base::SocketAddress& local_addr) : port_(port), ext_addr_(ext_addr), local_addr_(local_addr), server_index_(0), socket_(0), connected_(false), locked_(false), requests_(port->thread()) { requests_.SignalSendPacket.connect(this, &RelayEntry::OnSendPacket);}RelayEntry::~RelayEntry() { delete socket_;}void RelayEntry::Connect() { assert(socket_ == 0); const ProtocolAddress * ra = port()->ServerAddress(server_index_); if (!ra) { LOG(INFO) << "Out of relay server connections"; return; } LOG(INFO) << "Connecting to relay via " << ProtoToString(ra->proto) << " @ " << ra->address.ToString(); socket_ = port_->CreatePacketSocket(ra->proto); assert(socket_ != 0); socket_->SignalReadPacket.connect(this, &RelayEntry::OnReadPacket); if (socket_->Bind(local_addr_) < 0) LOG(INFO) << "bind: " << std::strerror(socket_->GetError()); for (unsigned i = 0; i < port_->options().size(); ++i) socket_->SetOption(port_->options()[i].first, port_->options()[i].second); if ((ra->proto == PROTO_TCP) || (ra->proto == PROTO_SSLTCP)) { talk_base::AsyncTCPSocket * tcp = static_cast<talk_base::AsyncTCPSocket *>(socket_); tcp->SignalClose.connect(this, &RelayEntry::OnSocketClose); tcp->SignalConnect.connect(this, &RelayEntry::OnSocketConnect); tcp->Connect(ra->address); } else { requests_.Send(new AllocateRequest(this)); }}void RelayEntry::OnConnect(const talk_base::SocketAddress& mapped_addr) { ProtocolType proto = PROTO_UDP; LOG(INFO) << "Relay allocate succeeded: " << ProtoToString(proto) << " @ " << mapped_addr.ToString(); connected_ = true; port_->AddExternalAddress(ProtocolAddress(mapped_addr, proto)); port_->SetReady();}int RelayEntry::SendTo(const void* data, size_t size, const talk_base::SocketAddress& addr) { // If this connection is locked to the address given, then we can send the // packet with no wrapper. if (locked_ && (ext_addr_ == addr)) return SendPacket(data, size); // Otherwise, we must wrap the given data in a STUN SEND request so that we // can communicate the destination address to the server. // // Note that we do not use a StunRequest here. This is because there is // likely no reason to resend this packet. If it is late, we just drop it. // The next send to this address will try again. StunMessage request; request.SetType(STUN_SEND_REQUEST); request.SetTransactionID(CreateRandomString(16)); StunByteStringAttribute* magic_cookie_attr = StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); magic_cookie_attr->CopyBytes(port_->magic_cookie().c_str(), (uint16)port_->magic_cookie().size()); request.AddAttribute(magic_cookie_attr); StunByteStringAttribute* username_attr = StunAttribute::CreateByteString(STUN_ATTR_USERNAME); username_attr->CopyBytes(port_->username_fragment().c_str(), (uint16)port_->username_fragment().size()); request.AddAttribute(username_attr); StunAddressAttribute* addr_attr = StunAttribute::CreateAddress(STUN_ATTR_DESTINATION_ADDRESS); addr_attr->SetFamily(1); addr_attr->SetIP(addr.ip()); addr_attr->SetPort(addr.port()); request.AddAttribute(addr_attr); // Attempt to lock if (ext_addr_ == addr) { StunUInt32Attribute* options_attr = StunAttribute::CreateUInt32(STUN_ATTR_OPTIONS); options_attr->SetValue(0x1); request.AddAttribute(options_attr); } StunByteStringAttribute* data_attr = StunAttribute::CreateByteString(STUN_ATTR_DATA); data_attr->CopyBytes(data, (uint16)size); request.AddAttribute(data_attr); // TODO: compute the HMAC. talk_base::ByteBuffer buf; request.Write(&buf); return SendPacket(buf.Data(), buf.Length());}void RelayEntry::ScheduleKeepAlive() { requests_.SendDelayed(new AllocateRequest(this), KEEPALIVE_DELAY);}void RelayEntry::HandleConnectFailure() { //if (GetMillisecondCount() - start_time_ > RETRY_TIMEOUT) // return; //ScheduleKeepAlive(); connected_ = false; port()->DisposeSocket(socket_); socket_ = 0; requests_.Clear(); server_index_ += 1; Connect();}void RelayEntry::OnSocketConnect(talk_base::AsyncTCPSocket* socket) { assert(socket == socket_); LOG(INFO) << "relay tcp connected to " << socket->GetRemoteAddress().ToString(); requests_.Send(new AllocateRequest(this));}void RelayEntry::OnSocketClose(talk_base::AsyncTCPSocket* socket, int error) { assert(socket == socket_); PLOG(LERROR, error) << "relay tcp connect failed"; HandleConnectFailure();}void RelayEntry::OnReadPacket(const char* data, size_t size, const talk_base::SocketAddress& remote_addr, talk_base::AsyncPacketSocket* socket) { assert(socket == socket_); //assert(remote_addr == port_->server_addr()); TODO: are we worried about this? // If the magic cookie is not present, then this is an unwrapped packet sent // by the server, The actual remote address is the one we recorded. if (!port_->HasMagicCookie(data, size)) { if (locked_) { port_->OnReadPacket(data, size, ext_addr_); } else { LOG(WARNING) << "Dropping packet: entry not locked"; } return; } talk_base::ByteBuffer buf(data, size); StunMessage msg; if (!msg.Read(&buf)) { LOG(INFO) << "Incoming packet was not STUN"; return; } // The incoming packet should be a STUN ALLOCATE response, SEND response, or // DATA indication. if (requests_.CheckResponse(&msg)) { return; } else if (msg.type() == STUN_SEND_RESPONSE) { if (const StunUInt32Attribute* options_attr = msg.GetUInt32(STUN_ATTR_OPTIONS)) { if (options_attr->value() & 0x1) { locked_ = true; } } return; } else if (msg.type() != STUN_DATA_INDICATION) { LOG(INFO) << "Received BAD stun type from server: " << msg.type() ; return; } // This must be a data indication. const StunAddressAttribute* addr_attr = msg.GetAddress(STUN_ATTR_SOURCE_ADDRESS2); if (!addr_attr) { LOG(INFO) << "Data indication has no source address"; return; } else if (addr_attr->family() != 1) { LOG(INFO) << "Source address has bad family"; return; } talk_base::SocketAddress remote_addr2(addr_attr->ip(), addr_attr->port()); const StunByteStringAttribute* data_attr = msg.GetByteString(STUN_ATTR_DATA); if (!data_attr) { LOG(INFO) << "Data indication has no data"; return; } // Process the actual data and remote address in the normal manner. port_->OnReadPacket(data_attr->bytes(), data_attr->length(), remote_addr2);}void RelayEntry::OnSendPacket(const void* data, size_t size, StunRequest* req) { SendPacket(data, size);}int RelayEntry::SendPacket(const void* data, size_t size) { const ProtocolAddress * ra = port_->ServerAddress(server_index_); if (!ra) { if (socket_) socket_->SetError(ENOTCONN); return SOCKET_ERROR; } int sent = socket_->SendTo(data, size, ra->address); if (sent <= 0) { LOG(LS_VERBOSE) << "sendto: " << std::strerror(socket_->GetError()); assert(sent < 0); } return sent;}AllocateRequest::AllocateRequest(RelayEntry* entry) : entry_(entry) { start_time_ = talk_base::GetMillisecondCount();}void AllocateRequest::Prepare(StunMessage* request) { request->SetType(STUN_ALLOCATE_REQUEST); StunByteStringAttribute* magic_cookie_attr = StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); magic_cookie_attr->CopyBytes( entry_->port()->magic_cookie().c_str(), (uint16)entry_->port()->magic_cookie().size()); request->AddAttribute(magic_cookie_attr); StunByteStringAttribute* username_attr = StunAttribute::CreateByteString(STUN_ATTR_USERNAME); username_attr->CopyBytes( entry_->port()->username_fragment().c_str(), (uint16)entry_->port()->username_fragment().size()); request->AddAttribute(username_attr);}int AllocateRequest::GetNextDelay() { int delay = 100 * talk_base::_max(1 << count_, 2); count_ += 1; if (count_ == 5) timeout_ = true; return delay;}void AllocateRequest::OnResponse(StunMessage* response) { const StunAddressAttribute* addr_attr = response->GetAddress(STUN_ATTR_MAPPED_ADDRESS); if (!addr_attr) { LOG(INFO) << "Allocate response missing mapped address."; } else if (addr_attr->family() != 1) { LOG(INFO) << "Mapped address has bad family"; } else { talk_base::SocketAddress addr(addr_attr->ip(), addr_attr->port()); entry_->OnConnect(addr); } // We will do a keep-alive regardless of whether this request suceeds. // This should have almost no impact on network usage. entry_->ScheduleKeepAlive();}void AllocateRequest::OnErrorResponse(StunMessage* response) { const StunErrorCodeAttribute* attr = response->GetErrorCode(); if (!attr) { LOG(INFO) << "Bad allocate response error code"; } else { LOG(INFO) << "Allocate error response:" << " code=" << static_cast<int>(attr->error_code()) << " reason='" << attr->reason() << "'"; } if (talk_base::GetMillisecondCount() - start_time_ <= RETRY_TIMEOUT) entry_->ScheduleKeepAlive();}void AllocateRequest::OnTimeout() { LOG(INFO) << "Allocate request timed out"; entry_->HandleConnectFailure();}} // namespace cricket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -