📄 p2ptransportchannel.cc
字号:
SignalConnectionMonitor(this);}// Track the best connection, and let listeners knowvoid P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) { // Note: the previous best_connection_ may be destroyed by now, so don't // use it. best_connection_ = conn; if (best_connection_) { LOG_J(LS_VERBOSE, this) << "New best connection: " << conn->ToString(); SignalRouteChange(this, best_connection_->remote_candidate().address()); }}void P2PTransportChannel::UpdateChannelState() { // The Handle* functions already set the writable state. We'll just double- // check it here. bool writable = (best_connection_ != NULL) && (best_connection_->write_state() == Connection::STATE_WRITABLE); ASSERT(writable == this->writable()); bool readable = false; for (uint32 i = 0; i < connections_.size(); ++i) { if (connections_[i]->read_state() == Connection::STATE_READABLE) readable = true; } set_readable(readable);}// We checked the status of our connections and we had at least one that// was writable, go into the writable state.void P2PTransportChannel::HandleWritable() { // // One or more connections writable! // if (!writable()) { for (uint32 i = 0; i < allocator_sessions_.size(); ++i) { if (allocator_sessions_[i]->IsGettingAllPorts()) { allocator_sessions_[i]->StopGetAllPorts(); } } // Stop further allocations. thread()->Clear(this, MSG_ALLOCATE); } // We're writable, obviously we aren't timed out was_writable_ = true; was_timed_out_ = false; set_writable(true);}// We checked the status of our connections and we didn't have any that// were writable, go into the connecting state (kick off a new allocator// session).void P2PTransportChannel::HandleNotWritable() { // // No connections are writable but not timed out! // if (was_writable_) { // If we were writable, let's kick off an allocator session immediately was_writable_ = false; OnAllocate(); } // We were connecting, obviously not ALL timed out. was_timed_out_ = false; set_writable(false);}// We checked the status of our connections and not only weren't they writable// but they were also timed out, we really need a new allocator.void P2PTransportChannel::HandleAllTimedOut() { // // No connections... all are timed out! // if (!was_timed_out_) { // We weren't timed out before, so kick off an allocator now (we'll still // be in the fully timed out state until the allocator actually gives back // new ports) OnAllocate(); } // NOTE: we start was_timed_out_ in the true state so that we don't get // another allocator created WHILE we are in the process of building up // our first allocator. was_timed_out_ = true; was_writable_ = false; set_writable(false);}// If we have a best connection, return it, otherwise return top one in the// list (later we will mark it best).Connection* P2PTransportChannel::GetBestConnectionOnNetwork( talk_base::Network* network) { // If the best connection is on this network, then it wins. if (best_connection_ && (best_connection_->port()->network() == network)) return best_connection_; // Otherwise, we return the top-most in sorted order. for (uint32 i = 0; i < connections_.size(); ++i) { if (connections_[i]->port()->network() == network) return connections_[i]; } return NULL;}// Handle any queued up requestsvoid P2PTransportChannel::OnMessage(talk_base::Message *pmsg) { if (pmsg->message_id == MSG_SORT) OnSort(); else if (pmsg->message_id == MSG_PING) OnPing(); else if (pmsg->message_id == MSG_ALLOCATE) OnAllocate(); else ASSERT(false);}// Handle queued up sort requestvoid P2PTransportChannel::OnSort() { // Resort the connections based on the new statistics. SortConnections();}// Handle queued up ping requestvoid P2PTransportChannel::OnPing() { // Make sure the states of the connections are up-to-date (since this affects // which ones are pingable). UpdateConnectionStates(); // Find the oldest pingable connection and have it do a ping. Connection* conn = FindNextPingableConnection(); if (conn) conn->Ping(talk_base::Time()); // Post ourselves a message to perform the next ping. uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY; thread()->PostDelayed(delay, this, MSG_PING);}// Is the connection in a state for us to even consider pinging the other side?bool P2PTransportChannel::IsPingable(Connection* conn) { // An unconnected connection cannot be written to at all, so pinging is out // of the question. if (!conn->connected()) return false; if (writable()) { // If we are writable, then we only want to ping connections that could be // better than this one, i.e., the ones that were not pruned. return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); } else { // If we are not writable, then we need to try everything that might work. // This includes both connections that do not have write timeout as well as // ones that do not have read timeout. A connection could be readable but // be in write-timeout if we pruned it before. Since the other side is // still pinging it, it very well might still work. return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || (conn->read_state() != Connection::STATE_READ_TIMEOUT); }}// Returns the next pingable connection to ping. This will be the oldest// pingable connection unless we have a writable connection that is past the// maximum acceptable ping delay.Connection* P2PTransportChannel::FindNextPingableConnection() { uint32 now = talk_base::Time(); if (best_connection_ && (best_connection_->write_state() == Connection::STATE_WRITABLE) && (best_connection_->last_ping_sent() + MAX_CURRENT_WRITABLE_DELAY <= now)) { return best_connection_; } Connection* oldest_conn = NULL; uint32 oldest_time = 0xFFFFFFFF; for (uint32 i = 0; i < connections_.size(); ++i) { if (IsPingable(connections_[i])) { if (connections_[i]->last_ping_sent() < oldest_time) { oldest_time = connections_[i]->last_ping_sent(); oldest_conn = connections_[i]; } } } return oldest_conn;}// return the number of "pingable" connectionsuint32 P2PTransportChannel::NumPingableConnections() { uint32 count = 0; for (uint32 i = 0; i < connections_.size(); ++i) { if (IsPingable(connections_[i])) count += 1; } return count;}// When a connection's state changes, we need to figure out who to use as// the best connection again. It could have become usable, or become unusable.void P2PTransportChannel::OnConnectionStateChange(Connection *connection) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // We have to unroll the stack before doing this because we may be changing // the state of connections while sorting. RequestSort();}// When a connection is removed, edit it out, and then update our best// connection.void P2PTransportChannel::OnConnectionDestroyed(Connection *connection) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Note: the previous best_connection_ may be destroyed by now, so don't // use it. // Remove this connection from the list. std::vector<Connection*>::iterator iter = find(connections_.begin(), connections_.end(), connection); ASSERT(iter != connections_.end()); connections_.erase(iter); LOG_J(LS_INFO, this) << "Removed connection (" << static_cast<int>(connections_.size()) << " remaining)"; // If this is currently the best connection, then we need to pick a new one. // The call to SortConnections will pick a new one. It looks at the current // best connection in order to avoid switching between fairly similar ones. // Since this connection is no longer an option, we can just set best to NULL // and re-choose a best assuming that there was no best connection. if (best_connection_ == connection) { SwitchBestConnectionTo(NULL); RequestSort(); }}// When a port is destroyed remove it from our list of ports to use for// connection attempts.void P2PTransportChannel::OnPortDestroyed(Port* port) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Remove this port from the list (if we didn't drop it already). std::vector<Port*>::iterator iter = find(ports_.begin(), ports_.end(), port); if (iter != ports_.end()) ports_.erase(iter); LOG(INFO) << "Removed port from p2p socket: " << static_cast<int>(ports_.size()) << " remaining";}// We data is available, let listeners knowvoid P2PTransportChannel::OnReadPacket(Connection *connection, const char *data, size_t len) { ASSERT(worker_thread_ == talk_base::Thread::Current()); // Let the client know of an incoming packet SignalReadPacket(this, data, len);}// Set options on ourselves is simply setting options on all of our available// port objects.int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { OptionMap::iterator it = options_.find(opt); if (it == options_.end()) { options_.insert(std::make_pair(opt, value)); } else if (it->second == value) { return 0; } else { it->second = value; } for (uint32 i = 0; i < ports_.size(); ++i) { int val = ports_[i]->SetOption(opt, value); if (val < 0) { // Because this also occurs deferred, probably no point in reporting an // error LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: " << ports_[i]->GetError(); } } return 0;}// Time for a new allocator, lets make sure we have a signalling channel// to communicate candidates through first.void P2PTransportChannel::OnAllocate() { waiting_for_signaling_ = true; SignalRequestSignaling();}// When the signalling channel is ready, we can really kick off the allocatorvoid P2PTransportChannel::OnSignalingReady() { if (waiting_for_signaling_) { waiting_for_signaling_ = false; AddAllocatorSession(allocator_->CreateSession(name(), session_type())); thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE); }}} // namespace cricket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -