📄 port.cc
字号:
// binding request yet. LOG_J(LS_WARNING, this) << "Received non-STUN packet from an unreadable connection."; } } else if (!msg) { // The packet was STUN, but was already handled } else if (remote_username != remote_candidate_.username()) { // Not destined this connection LOG_J(LS_ERROR, this) << "Received STUN packet on wrong address."; if (msg->type() == STUN_BINDING_REQUEST) { port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST, STUN_ERROR_REASON_BAD_REQUEST); } delete msg; } else { // The packet is STUN, with the current username // If this is a STUN request, then update the readable bit and respond. // If this is a STUN response, then update the writable bit. switch (msg->type()) { case STUN_BINDING_REQUEST: // Incoming, validated stun request from remote peer. // This call will also set the connection readable. port_->SendBindingResponse(msg, addr); // If timed out sending writability checks, start up again if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) set_write_state(STATE_WRITE_CONNECT); break; case STUN_BINDING_RESPONSE: case STUN_BINDING_ERROR_RESPONSE: // Response from remote peer. Does it match request sent? // This doesn't just check, it makes callbacks if transaction // id's match requests_.CheckResponse(msg); break; default: assert(false); break; } // Done with the message; delete delete msg; }}void Connection::Prune() { if (!pruned_) { LOG_J(LS_VERBOSE, this) << "Connection pruned"; pruned_ = true; requests_.Clear(); set_write_state(STATE_WRITE_TIMEOUT); }}void Connection::Destroy() { LOG_J(LS_VERBOSE, this) << "Connection destroyed"; set_read_state(STATE_READ_TIMEOUT); set_write_state(STATE_WRITE_TIMEOUT);}void Connection::UpdateState(uint32 now) { // Check the readable state. // // Since we don't know how many pings the other side has attempted, the best // test we can do is a simple window. if ((read_state_ == STATE_READABLE) && (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) { set_read_state(STATE_READ_TIMEOUT); } // Check the writable state. (The order of these checks is important.) // // Before becoming unwritable, we allow for a fixed number of pings to fail // (i.e., receive no response). We also have to give the response time to // get back, so we include a conservative estimate of this. // // Before timing out writability, we give a fixed amount of time. This is to // allow for changes in network conditions. uint32 rtt = ConservativeRTTEstimate(rtt_, rtt_data_points_); if ((write_state_ == STATE_WRITABLE) && TooManyFailures(pings_since_last_response_, CONNECTION_WRITE_CONNECT_FAILURES, rtt, now) && TooLongWithoutResponse(pings_since_last_response_, CONNECTION_WRITE_CONNECT_TIMEOUT, now)) { set_write_state(STATE_WRITE_CONNECT); } if ((write_state_ == STATE_WRITE_CONNECT) && TooLongWithoutResponse(pings_since_last_response_, CONNECTION_WRITE_TIMEOUT, now)) { set_write_state(STATE_WRITE_TIMEOUT); }}void Connection::Ping(uint32 now) { assert(connected_); last_ping_sent_ = now; pings_since_last_response_.push_back(now); requests_.Send(new ConnectionRequest(this));}void Connection::ReceivedPing() { last_ping_received_ = talk_base::Time(); set_read_state(STATE_READABLE);}std::string Connection::ToString() const { const char CONNECT_STATE_ABBREV[2] = { '-', // not connected (false) 'C', // connected (true) }; const char READ_STATE_ABBREV[2] = { 'R', // STATE_READABLE '-', // STATE_READ_TIMEOUT }; const char WRITE_STATE_ABBREV[3] = { 'W', // STATE_WRITABLE 'w', // STATE_WRITE_CONNECT '-', // STATE_WRITE_TIMEOUT }; const Candidate& local = local_candidate(); const Candidate& remote = remote_candidate(); std::stringstream ss; ss << "Conn[" << local.generation() << ":" << local.name() << ":" << local.type() << ":" << local.address().ToString() << "->" << remote.name() << ":" << remote.type() << ":" << remote.address().ToString() << "|" << CONNECT_STATE_ABBREV[connected()] << READ_STATE_ABBREV[read_state()] << WRITE_STATE_ABBREV[write_state()] << "]"; return ss.str();}void Connection::OnConnectionRequestResponse(StunMessage *response, uint32 rtt) { // We have a potentially valid reply from the remote address. // The packet must include a username that ends with our fragment, // since it is a response. // Check exact message type bool valid = true; if (response->type() != STUN_BINDING_RESPONSE) valid = false; // Must have username attribute const StunByteStringAttribute* username_attr = response->GetByteString(STUN_ATTR_USERNAME); if (valid) { if (!username_attr) { LOG_J(LS_ERROR, this) << "Received likely STUN packet with no username"; valid = false; } } // Length must be at least the size of our fragment (actually, should // be bigger since our fragment is at the end!) if (valid) { if (username_attr->length() <= port_->username_fragment().size()) { LOG_J(LS_ERROR, this) << "Received likely STUN packet with short username"; valid = false; } } // Compare our fragment with the end of the username - must be exact match if (valid) { std::string username_fragment = port_->username_fragment(); int offset = (int)(username_attr->length() - username_fragment.size()); if (std::memcmp(username_attr->bytes() + offset, username_fragment.c_str(), username_fragment.size()) != 0) { LOG_J(LS_ERROR, this) << "Received STUN response with bad username"; valid = false; } } if (valid) { // Valid response. If we're not already, become writable. We may be // bringing a pruned connection back to life, but if we don't really want // it, we can always prune it again. set_write_state(STATE_WRITABLE); pings_since_last_response_.clear(); rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1); rtt_data_points_ += 1; }}void Connection::OnConnectionRequestErrorResponse(StunMessage *response, uint32 rtt) { const StunErrorCodeAttribute* error = response->GetErrorCode(); uint32 error_code = error ? error->error_code() : STUN_ERROR_GLOBAL_FAILURE; if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE) || (error_code == STUN_ERROR_SERVER_ERROR) || (error_code == STUN_ERROR_UNAUTHORIZED)) { // Recoverable error, retry } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) { // Race failure, retry } else { // This is not a valid connection. set_write_state(STATE_WRITE_TIMEOUT); }}void Connection::CheckTimeout() { // If both read and write have timed out, then this connection can contribute // no more to p2p socket unless at some later date readability were to come // back. However, we gave readability a long time to timeout, so at this // point, it seems fair to get rid of this connectoin. if ((read_state_ == STATE_READ_TIMEOUT) && (write_state_ == STATE_WRITE_TIMEOUT)) { port_->thread()->Post(this, MSG_DELETE); }}void Connection::OnMessage(talk_base::Message *pmsg) { assert(pmsg->message_id == MSG_DELETE); LOG_J(LS_INFO, this) << "Connection deleted"; SignalDestroyed(this); delete this;}size_t Connection::recv_bytes_second() { // Snapshot bytes / second calculator uint32 current_time = talk_base::Time(); if (last_recv_bytes_second_time_ != (uint32)-1) { int delta = talk_base::TimeDiff(current_time, last_recv_bytes_second_time_); if (delta >= 1000) { int fraction_time = delta % 1000; int seconds_time = delta - fraction_time; int fraction_bytes = (int)(recv_total_bytes_ - last_recv_bytes_second_calc_) * fraction_time / delta; recv_bytes_second_ = (recv_total_bytes_ - last_recv_bytes_second_calc_ - fraction_bytes) * seconds_time / delta; last_recv_bytes_second_time_ = current_time - fraction_time; last_recv_bytes_second_calc_ = recv_total_bytes_ - fraction_bytes; } } if (last_recv_bytes_second_time_ == (uint32)-1) { last_recv_bytes_second_time_ = current_time; last_recv_bytes_second_calc_ = recv_total_bytes_; } return recv_bytes_second_;}size_t Connection::recv_total_bytes() { return recv_total_bytes_;}size_t Connection::sent_bytes_second() { // Snapshot bytes / second calculator uint32 current_time = talk_base::Time(); if (last_sent_bytes_second_time_ != (uint32)-1) { int delta = talk_base::TimeDiff(current_time, last_sent_bytes_second_time_); if (delta >= 1000) { int fraction_time = delta % 1000; int seconds_time = delta - fraction_time; int fraction_bytes = (int)(sent_total_bytes_ - last_sent_bytes_second_calc_) * fraction_time / delta; sent_bytes_second_ = (sent_total_bytes_ - last_sent_bytes_second_calc_ - fraction_bytes) * seconds_time / delta; last_sent_bytes_second_time_ = current_time - fraction_time; last_sent_bytes_second_calc_ = sent_total_bytes_ - fraction_bytes; } } if (last_sent_bytes_second_time_ == (uint32)-1) { last_sent_bytes_second_time_ = current_time; last_sent_bytes_second_calc_ = sent_total_bytes_; } return sent_bytes_second_;}size_t Connection::sent_total_bytes() { return sent_total_bytes_;}ProxyConnection::ProxyConnection(Port* port, size_t index, const Candidate& candidate) : Connection(port, index, candidate), error_(0) {}int ProxyConnection::Send(const void* data, size_t size) { if (write_state() != STATE_WRITABLE) { error_ = EWOULDBLOCK; return SOCKET_ERROR; } int sent = port_->SendTo(data, size, remote_candidate_.address(), true); if (sent <= 0) { assert(sent < 0); error_ = port_->GetError(); } else { sent_total_bytes_ += sent; } return sent;}} // namespace cricket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -