📄 p2ptransportchannel.cc
字号:
// is currently available for. See if the remote user name is in the remote // candidate list. If it isn't return error to the stun request. const Candidate *candidate = NULL; std::vector<RemoteCandidate>::iterator it; for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) { if ((*it).username() == remote_username) { candidate = &(*it); break; } } if (candidate == NULL) { // Don't know about this username, the request is bogus // This sometimes happens if a binding response comes in before the ACCEPT // message. It is totally valid; the retry state machine will try again. port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS); delete stun_msg; return; } // Check for connectivity to this address. Create connections // to this address across all local ports. First, add this as a new remote // address Candidate new_remote_candidate = *candidate; new_remote_candidate.set_address(address); //new_remote_candidate.set_protocol(port->protocol()); // This remote username exists. Now create connections using this candidate, // and resort if (CreateConnections(new_remote_candidate, port, true)) { // Send the pinger a successful stun response. port->SendBindingResponse(stun_msg, address); // Update the list of connections since we just added another. We do this // after sending the response since it could (in principle) delete the // connection in question. SortConnections(); } else { // Hopefully this won't occur, because changing a destination address // shouldn't cause a new connection to fail ASSERT(false); port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR, STUN_ERROR_REASON_SERVER_ERROR); } delete stun_msg;}// We received a candidate from the other side, make connections so we// can try to use these remote candidates with our local candidates. void P2PTransportChannel::OnChannelMessage(const buzz::XmlElement* msg) { ASSERT(worker_thread_ == talk_base::Thread::Current()); Candidate remote_candidate; bool valid = transport_->ParseCandidate(NULL, msg, &remote_candidate); ASSERT(valid); // Create connections to this remote candidate. CreateConnections(remote_candidate, NULL, false); // Resort the connections list, which may have new elements. SortConnections();}// Creates connections from all of the ports that we care about to the given// remote candidate. The return value is true iff we created a connection from// the origin port.bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate, Port* origin_port, bool readable) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Add a new connection for this candidate to every port that allows such a // connection (i.e., if they have compatible protocols) and that does not // already have a connection to an equivalent candidate. We must be careful // to make sure that the origin port is included, even if it was pruned, // since that may be the only port that can create this connection. bool created = false; std::vector<Port *>::reverse_iterator it; for (it = ports_.rbegin(); it != ports_.rend(); ++it) { if (CreateConnection(*it, remote_candidate, origin_port, readable)) { if (*it == origin_port) created = true; } } if ((origin_port != NULL) && find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { if (CreateConnection(origin_port, remote_candidate, origin_port, readable)) created = true; } // Remember this remote candidate so that we can add it to future ports. RememberRemoteCandidate(remote_candidate, origin_port); return created;}// Setup a connection object for the local and remote candidate combination.// And then listen to connection object for changes.bool P2PTransportChannel::CreateConnection(Port* port, const Candidate& remote_candidate, Port* origin_port, bool readable) { // Look for an existing connection with this remote address. If one is not // found, then we can create a new connection for this address. Connection* connection = port->GetConnection(remote_candidate.address()); if (connection != NULL) { // It is not legal to try to change any of the parameters of an existing // connection; however, the other side can send a duplicate candidate. if (!remote_candidate.IsEquivalent(connection->remote_candidate())) { LOG(INFO) << "Attempt to change a remote candidate"; return false; } } else { Port::CandidateOrigin origin = GetOrigin(port, origin_port); connection = port->CreateConnection(remote_candidate, origin); if (!connection) return false; connections_.push_back(connection); connection->SignalReadPacket.connect( this, &P2PTransportChannel::OnReadPacket); connection->SignalStateChange.connect( this, &P2PTransportChannel::OnConnectionStateChange); connection->SignalDestroyed.connect( this, &P2PTransportChannel::OnConnectionDestroyed); } // If we are readable, it is because we are creating this in response to a // ping from the other side. This will cause the state to become readable. if (readable) connection->ReceivedPing(); return true;}// Maintain our remote candidate list, adding this new remote one.void P2PTransportChannel::RememberRemoteCandidate( const Candidate& remote_candidate, Port* origin_port) { // Remove any candidates whose generation is older than this one. The // presence of a new generation indicates that the old ones are not useful. uint32 i = 0; while (i < remote_candidates_.size()) { if (remote_candidates_[i].generation() < remote_candidate.generation()) { LOG(INFO) << "Pruning candidate from old generation: " << remote_candidates_[i].address().ToString(); remote_candidates_.erase(remote_candidates_.begin() + i); } else { i += 1; } } // Make sure this candidate is not a duplicate. for (uint32 i = 0; i < remote_candidates_.size(); ++i) { if (remote_candidates_[i].IsEquivalent(remote_candidate)) { LOG(INFO) << "Duplicate candidate: " << remote_candidate.address().ToString(); return; } } // Try this candidate for all future ports. remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port)); // We have some candidates from the other side, we are now serious about // this connection. Let's do the StartGetAllPorts thing. if (!pinging_started_) { pinging_started_ = true; for (size_t i = 0; i < allocator_sessions_.size(); ++i) { if (!allocator_sessions_[i]->IsGettingAllPorts()) allocator_sessions_[i]->StartGetAllPorts(); } }}// Send data to the other side, using our best connectionint P2PTransportChannel::SendPacket(const char *data, size_t len) { // This can get called on any thread that is convenient to write from! if (best_connection_ == NULL) { error_ = EWOULDBLOCK; return SOCKET_ERROR; } int sent = best_connection_->Send(data, len); if (sent <= 0) { ASSERT(sent < 0); error_ = best_connection_->GetError(); } return sent;}// Monitor connection statesvoid P2PTransportChannel::UpdateConnectionStates() { uint32 now = talk_base::Time(); // We need to copy the list of connections since some may delete themselves // when we call UpdateState. for (uint32 i = 0; i < connections_.size(); ++i) connections_[i]->UpdateState(now);}// Prepare for best candidate sortingvoid P2PTransportChannel::RequestSort() { if (!sort_dirty_) { worker_thread_->Post(this, MSG_SORT); sort_dirty_ = true; }}// Sort the available connections to find the best one. We also monitor// the number of available connections and the current state so that we // can possibly kick off more allocators (for more connections).void P2PTransportChannel::SortConnections() { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Make sure the connection states are up-to-date since this affects how they // will be sorted. UpdateConnectionStates(); // Any changes after this point will require a re-sort. sort_dirty_ = false; // Get a list of the networks that we are using. std::set<talk_base::Network*> networks; for (uint32 i = 0; i < connections_.size(); ++i) networks.insert(connections_[i]->port()->network()); // Find the best alternative connection by sorting. It is important to note // that amongst equal preference, writable connections, this will choose the // one whose estimated latency is lowest. So it is the only one that we // need to consider switching to. ConnectionCompare cmp; std::stable_sort(connections_.begin(), connections_.end(), cmp); Connection* top_connection = NULL; if (connections_.size() > 0) top_connection = connections_[0]; // If necessary, switch to the new choice. if (ShouldSwitch(best_connection_, top_connection)) SwitchBestConnectionTo(top_connection); // We can prune any connection for which there is a writable connection on // the same network with better or equal prefences. We leave those with // better preference just in case they become writable later (at which point, // we would prune out the current best connection). We leave connections on // other networks because they may not be using the same resources and they // may represent very distinct paths over which we can switch. std::set<talk_base::Network*>::iterator network; for (network = networks.begin(); network != networks.end(); ++network) { Connection* primier = GetBestConnectionOnNetwork(*network); if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) continue; for (uint32 i = 0; i < connections_.size(); ++i) { if ((connections_[i] != primier) && (connections_[i]->port()->network() == *network) && (CompareConnectionCandidates(primier, connections_[i]) >= 0)) { connections_[i]->Prune(); } } } // Count the number of connections in the various states. int writable = 0; int write_connect = 0; int write_timeout = 0; for (uint32 i = 0; i < connections_.size(); ++i) { switch (connections_[i]->write_state()) { case Connection::STATE_WRITABLE: ++writable; break; case Connection::STATE_WRITE_CONNECT: ++write_connect; break; case Connection::STATE_WRITE_TIMEOUT: ++write_timeout; break; default: ASSERT(false); } } if (writable > 0) { HandleWritable(); } else if (write_connect > 0) { HandleNotWritable(); } else { HandleAllTimedOut(); } // Update the state of this channel. This method is called whenever the // state of any connection changes, so this is a good place to do this. UpdateChannelState(); // Notify of connection state change
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -