📄 port.cc
字号:
if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) { if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) { LOG_J(LS_ERROR, this) << "Received STUN binding error:" << " class=" << error_code->error_class() << " number=" << error_code->number() << " reason='" << error_code->reason() << "'"; // Return message to allow error-specific processing } else { LOG_J(LS_ERROR, this) << "Received STUN error response with no error code"; // Drop corrupt message return true; } } } else { LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type (" << stun_msg->type() << ")"; return true; } // Return the STUN message found. msg = stun_msg.release(); return true;}void Port::SendBindingResponse( StunMessage* request, const talk_base::SocketAddress& addr) { assert(request->type() == STUN_BINDING_REQUEST); // Retrieve the username from the request. const StunByteStringAttribute* username_attr = request->GetByteString(STUN_ATTR_USERNAME); assert(username_attr); // Fill in the response message. StunMessage response; response.SetType(STUN_BINDING_RESPONSE); response.SetTransactionID(request->transaction_id()); StunByteStringAttribute* username2_attr = StunAttribute::CreateByteString(STUN_ATTR_USERNAME); username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); response.AddAttribute(username2_attr); StunAddressAttribute* addr_attr = StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS); addr_attr->SetFamily(1); addr_attr->SetPort(addr.port()); addr_attr->SetIP(addr.ip()); response.AddAttribute(addr_attr); // Send the response message. // NOTE: If we wanted to, this is where we would add the HMAC. talk_base::ByteBuffer buf; response.Write(&buf); SendTo(buf.Data(), buf.Length(), addr, false); // The fact that we received a successful request means that this connection // (if one exists) should now be readable. Connection* conn = GetConnection(addr); assert(conn); if (conn) conn->ReceivedPing();}void Port::SendBindingErrorResponse( StunMessage* request, const talk_base::SocketAddress& addr, int error_code, const std::string& reason) { assert(request->type() == STUN_BINDING_REQUEST); // Retrieve the username from the request. If it didn't have one, we // shouldn't be responding at all. const StunByteStringAttribute* username_attr = request->GetByteString(STUN_ATTR_USERNAME); assert(username_attr); // Fill in the response message. StunMessage response; response.SetType(STUN_BINDING_ERROR_RESPONSE); response.SetTransactionID(request->transaction_id()); StunByteStringAttribute* username2_attr = StunAttribute::CreateByteString(STUN_ATTR_USERNAME); username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); response.AddAttribute(username2_attr); StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode(); error_attr->SetErrorCode(error_code); error_attr->SetReason(reason); response.AddAttribute(error_attr); // Send the response message. // NOTE: If we wanted to, this is where we would add the HMAC. talk_base::ByteBuffer buf; response.Write(&buf); SendTo(buf.Data(), buf.Length(), addr, false);}talk_base::AsyncPacketSocket* Port::CreatePacketSocket(ProtocolType proto) { if (proto == PROTO_UDP) { return new talk_base::AsyncUDPSocket( factory_->CreateAsyncSocket(SOCK_DGRAM)); } else if ((proto == PROTO_TCP) || (proto == PROTO_SSLTCP)) { talk_base::AsyncSocket * socket = factory_->CreateAsyncSocket(SOCK_STREAM); switch (proxy().type) { case talk_base::PROXY_NONE: break; case talk_base::PROXY_SOCKS5: socket = new talk_base::AsyncSocksProxySocket( socket, proxy().address, proxy().username, proxy().password); break; case talk_base::PROXY_HTTPS: default: socket = new talk_base::AsyncHttpsProxySocket( socket, user_agent(), proxy().address, proxy().username, proxy().password); break; } if (proto == PROTO_SSLTCP) { socket = new talk_base::AsyncSSLSocket(socket); } return new talk_base::AsyncTCPSocket(socket); } else { LOG_J(LS_INFO, this) << "Unknown protocol (" << proto << ")"; return 0; }}void Port::OnMessage(talk_base::Message *pmsg) { assert(pmsg->message_id == MSG_CHECKTIMEOUT); assert(lifetime_ == LT_PRETIMEOUT); lifetime_ = LT_POSTTIMEOUT; CheckTimeout();}std::string Port::ToString() const { std::stringstream ss; ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]"; return ss.str();}void Port::EnablePortPackets() { enable_port_packets_ = true;}void Port::Start() { // The port sticks around for a minimum lifetime, after which // we destroy it when it drops to zero connections. if (lifetime_ == LT_PRESTART) { lifetime_ = LT_PRETIMEOUT; thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT); } else { LOG_J(LS_WARNING, this) << "Port restart attempted"; }}void Port::OnConnectionDestroyed(Connection* conn) { AddressMap::iterator iter = connections_.find(conn->remote_candidate().address()); assert(iter != connections_.end()); connections_.erase(iter); CheckTimeout();}void Port::Destroy() { assert(connections_.empty()); LOG_J(LS_INFO, this) << "Port deleted"; SignalDestroyed(this); delete this;}void Port::CheckTimeout() { // If this port has no connections, then there's no reason to keep it around. // When the connections time out (both read and write), they will delete // themselves, so if we have any connections, they are either readable or // writable (or still connecting). if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) { Destroy(); }}// A ConnectionRequest is a simple STUN ping used to determine writability.class ConnectionRequest : public StunRequest {public: ConnectionRequest(Connection* connection) : connection_(connection) { } virtual ~ConnectionRequest() { } virtual void Prepare(StunMessage* request) { request->SetType(STUN_BINDING_REQUEST); StunByteStringAttribute* username_attr = StunAttribute::CreateByteString(STUN_ATTR_USERNAME); std::string username = connection_->remote_candidate().username(); username.append(connection_->port()->username_fragment()); username_attr->CopyBytes(username.c_str(), (uint16)username.size()); request->AddAttribute(username_attr); } virtual void OnResponse(StunMessage* response) { connection_->OnConnectionRequestResponse(response, Elapsed()); } virtual void OnErrorResponse(StunMessage* response) { connection_->OnConnectionRequestErrorResponse(response, Elapsed()); } virtual void OnTimeout() { } virtual int GetNextDelay() { // Each request is sent only once. After a single delay , the request will // time out. timeout_ = true; return CONNECTION_RESPONSE_TIMEOUT; }private: Connection* connection_;};//// Connection//Connection::Connection(Port* port, size_t index, const Candidate& remote_candidate) : requests_(port->thread()), port_(port), local_candidate_index_(index), remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT), write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false), rtt_(0), rtt_data_points_(0), last_ping_sent_(0), last_ping_received_(0), recv_total_bytes_(0), recv_bytes_second_(0), last_recv_bytes_second_time_((uint32)-1), last_recv_bytes_second_calc_(0), sent_total_bytes_(0), sent_bytes_second_(0), last_sent_bytes_second_time_((uint32)-1), last_sent_bytes_second_calc_(0), reported_(false) { // Wire up to send stun packets requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);}Connection::~Connection() {}const Candidate& Connection::local_candidate() const { if (local_candidate_index_ < port_->candidates().size()) return port_->candidates()[local_candidate_index_]; assert(false); static Candidate foo; return foo;}void Connection::set_read_state(ReadState value) { ReadState old_value = read_state_; read_state_ = value; if (value != old_value) { LOG_J(LS_VERBOSE, this) << "set_read_state"; SignalStateChange(this); CheckTimeout(); }}void Connection::set_write_state(WriteState value) { WriteState old_value = write_state_; write_state_ = value; if (value != old_value) { LOG_J(LS_VERBOSE, this) << "set_write_state"; SignalStateChange(this); CheckTimeout(); }}void Connection::set_connected(bool value) { bool old_value = connected_; connected_ = value; if (value != old_value) { LOG_J(LS_VERBOSE, this) << "set_connected"; }}void Connection::OnSendStunPacket( const void* data, size_t size, StunRequest* req) { port_->SendTo(data, size, remote_candidate_.address(), false);}void Connection::OnReadPacket(const char* data, size_t size) { StunMessage* msg; std::string remote_username; const talk_base::SocketAddress& addr(remote_candidate_.address()); if (!port_->GetStunMessage(data, size, addr, msg, remote_username)) { // The packet did not parse as a valid STUN message // If this connection is readable, then pass along the packet. if (read_state_ == STATE_READABLE) { // readable means data from this address is acceptable // Send it on! recv_total_bytes_ += size; SignalReadPacket(this, data, size); // If timed out sending writability checks, start up again if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) set_write_state(STATE_WRITE_CONNECT); } else { // Not readable means the remote address hasn't send a valid
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -