📄 p2ptransportchannel.cc
字号:
/* * libjingle * Copyright 2004--2005, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */#if defined(_MSC_VER) && _MSC_VER < 1300#pragma warning(disable:4786)#endif#include <errno.h>#include <iostream>#include "talk/base/common.h" #include "talk/base/logging.h" #include "talk/p2p/base/common.h"#include "talk/p2p/base/p2ptransportchannel.h"namespace {// messages for queuing up work for ourselvesconst uint32 MSG_SORT = 1;const uint32 MSG_PING = 2;const uint32 MSG_ALLOCATE = 3;// When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)// for pinging. When the socket is writable, we will use only 1 Kbps because// we don't want to degrade the quality on a modem. These numbers should work// well on a 28.8K modem, which is the slowest connection on which the voice// quality is reasonable at all.static const uint32 PING_PACKET_SIZE = 60 * 8;static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480msstatic const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000;// 50ms// If there is a current writable connection, then we will also try hard to// make sure it is pinged at this rate.static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit// The minimum improvement in MOS that justifies a switch.static const double kMinImprovement = 10;// Amount of time that we wait when *losing* writability before we try doing// another allocation.static const int kAllocateDelay = 1 * 1000; // 1 second// We will try creating a new allocator from scratch after a delay of this// length without becoming writable (or timing out).static const int kAllocatePeriod = 20 * 1000; // 20 secondscricket::Port::CandidateOrigin GetOrigin(cricket::Port* port, cricket::Port* origin_port) { if (!origin_port) return cricket::Port::ORIGIN_MESSAGE; else if (port == origin_port) return cricket::Port::ORIGIN_THIS_PORT; else return cricket::Port::ORIGIN_OTHER_PORT;}// Compares two connections based only on static information about them.int CompareConnectionCandidates(cricket::Connection* a, cricket::Connection* b) { // Combine local and remote preferences ASSERT(a->local_candidate().preference() == a->port()->preference()); ASSERT(b->local_candidate().preference() == b->port()->preference()); double a_pref = a->local_candidate().preference() * a->remote_candidate().preference(); double b_pref = b->local_candidate().preference() * b->remote_candidate().preference(); // Now check combined preferences. Lower values get sorted last. if (a_pref > b_pref) return 1; if (a_pref < b_pref) return -1; return 0;}// Compare two connections based on their writability and static preferences.int CompareConnections(cricket::Connection *a, cricket::Connection *b) { // Sort based on write-state. Better states have lower values. if (a->write_state() < b->write_state()) return 1; if (a->write_state() > b->write_state()) return -1; // Compare the candidate information. return CompareConnectionCandidates(a, b);}// Wraps the comparison connection into a less than operator that puts higher// priority writable connections first.class ConnectionCompare {public: bool operator()(const cricket::Connection *ca, const cricket::Connection *cb) { cricket::Connection* a = const_cast<cricket::Connection*>(ca); cricket::Connection* b = const_cast<cricket::Connection*>(cb); // Compare first on writability and static preferences. int cmp = CompareConnections(a, b); if (cmp > 0) return true; if (cmp < 0) return false; // Otherwise, sort based on latency estimate. return a->rtt() < b->rtt(); // Should we bother checking for the last connection that last received // data? It would help rendezvous on the connection that is also receiving // packets. // // TODO: Yes we should definitely do this. The TCP protocol gains // efficiency by being used bidirectionally, as opposed to two separate // unidirectional streams. This test should probably occur before // comparison of local prefs (assuming combined prefs are the same). We // need to be careful though, not to bounce back and forth with both sides // trying to rendevous with the other. }};// Determines whether we should switch between two connections, based first on// static preferences and then (if those are equal) on latency estimates.bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) { if (a_conn == b_conn) return false; if ((a_conn == NULL) || (b_conn == NULL)) // don't think the latter should happen return true; int prefs_cmp = CompareConnections(a_conn, b_conn); if (prefs_cmp < 0) return true; if (prefs_cmp > 0) return false; return b_conn->rtt() <= a_conn->rtt() + kMinImprovement;}} // unnamed namespacenamespace cricket {P2PTransportChannel::P2PTransportChannel(const std::string &name, const std::string &session_type, P2PTransport* transport, PortAllocator *allocator): TransportChannelImpl(name, session_type), transport_(transport), allocator_(allocator), worker_thread_(talk_base::Thread::Current()), waiting_for_signaling_(false), error_(0), best_connection_(NULL), pinging_started_(false), sort_dirty_(false), was_writable_(false), was_timed_out_(true) {}P2PTransportChannel::~P2PTransportChannel() { ASSERT(worker_thread_ == talk_base::Thread::Current()); for (uint32 i = 0; i < allocator_sessions_.size(); ++i) delete allocator_sessions_[i];}// Add the allocator session to our list so that we know which sessions// are still active.void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) { session->set_generation(static_cast<uint32>(allocator_sessions_.size())); allocator_sessions_.push_back(session); // We now only want to apply new candidates that we receive to the ports // created by this new session because these are replacing those of the // previous sessions. ports_.clear(); session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady); session->SignalCandidatesReady.connect( this, &P2PTransportChannel::OnCandidatesReady); session->GetInitialPorts(); if (pinging_started_) session->StartGetAllPorts();}// Go into the state of processing candidates, and running in generalvoid P2PTransportChannel::Connect() { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Kick off an allocator session OnAllocate(); // Start pinging as the ports come in. thread()->Post(this, MSG_PING);}// Reset the socket, clear up any previous allocations and start overvoid P2PTransportChannel::Reset() { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Get rid of all the old allocators. This should clean up everything. for (uint32 i = 0; i < allocator_sessions_.size(); ++i) delete allocator_sessions_[i]; allocator_sessions_.clear(); ports_.clear(); connections_.clear(); best_connection_ = NULL; // Forget about all of the candidates we got before. remote_candidates_.clear(); // Revert to the initial state. set_readable(false); set_writable(false); // Reinitialize the rest of our state. waiting_for_signaling_ = false; pinging_started_ = false; sort_dirty_ = false; was_writable_ = false; was_timed_out_ = true; // If we allocated before, start a new one now. if (transport_->connect_requested()) OnAllocate(); // Start pinging as the ports come in. thread()->Clear(this); thread()->Post(this, MSG_PING);}// A new port is available, attempt to make connections for itvoid P2PTransportChannel::OnPortReady(PortAllocatorSession *session, Port* port) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Set in-effect options on the new port for (OptionMap::const_iterator it = options_.begin(); it != options_.end(); ++it) { int val = port->SetOption(it->first, it->second); if (val < 0) { LOG_J(LS_WARNING, port) << "SetOption(" << it->first << ", " << it->second << ") failed: " << port->GetError(); } } // Remember the ports and candidates, and signal that candidates are ready. // The session will handle this, and send an initiate/accept/modify message // if one is pending. ports_.push_back(port); port->SignalUnknownAddress.connect( this, &P2PTransportChannel::OnUnknownAddress); port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed); // Attempt to create a connection from this new port to all of the remote // candidates that we were given so far. std::vector<RemoteCandidate>::iterator iter; for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); ++iter) CreateConnection(port, *iter, iter->origin_port(), false); SortConnections();}// A new candidate is available, let listeners knowvoid P2PTransportChannel::OnCandidatesReady( PortAllocatorSession *session, const std::vector<Candidate>& candidates) { for (size_t i = 0; i < candidates.size(); ++i) { buzz::XmlElement* msg = transport_->TranslateCandidate(candidates[i]); SignalChannelMessage(this, msg); }}// Handle stun packets void P2PTransportChannel::OnUnknownAddress( Port *port, const talk_base::SocketAddress &address, StunMessage *stun_msg, const std::string &remote_username) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Port has received a valid stun packet from an address that no Connection
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -