⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transport.cc

📁 本人收集整理的一份c/c++跨平台网络库
💻 CC
字号:
/* * libjingle * Copyright 2004--2005, Google Inc. * * Redistribution and use in source and binary forms, with or without  * modification, are permitted provided that the following conditions are met: * *  1. Redistributions of source code must retain the above copyright notice,  *     this list of conditions and the following disclaimer. *  2. Redistributions in binary form must reproduce the above copyright notice, *     this list of conditions and the following disclaimer in the documentation *     and/or other materials provided with the distribution. *  3. The name of the author may not be used to endorse or promote products  *     derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */#include "talk/base/common.h"#include "talk/p2p/base/transport.h"#include "talk/p2p/base/sessionmanager.h"#include "talk/p2p/base/transportchannelimpl.h"#include "talk/p2p/base/constants.h"#include "talk/xmllite/xmlelement.h"#include "talk/xmpp/constants.h"namespace {struct ChannelParams {  std::string name;  std::string session_type;  cricket::TransportChannelImpl* channel;  buzz::XmlElement* elem;  ChannelParams() : channel(NULL), elem(NULL) {}};typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;const int MSG_CREATECHANNEL = 1;const int MSG_DESTROYCHANNEL = 2;const int MSG_DESTROYALLCHANNELS = 3;const int MSG_CONNECTCHANNELS = 4;const int MSG_RESETCHANNELS = 5;const int MSG_ONSIGNALINGREADY = 6;const int MSG_FORWARDCHANNELMESSAGE = 7;const int MSG_READSTATE = 8;const int MSG_WRITESTATE = 9;const int MSG_REQUESTSIGNALING = 10;const int MSG_ONCHANNELMESSAGE = 11;const int MSG_CONNECTING = 12;}  // namespacenamespace cricket {Transport::Transport(SessionManager* session_manager, const std::string& name)  : session_manager_(session_manager), name_(name), destroyed_(false),    readable_(false), writable_(false), connect_requested_(false),    allow_local_ips_(false) {  ASSERT(session_manager_->signaling_thread()->IsCurrent());}Transport::~Transport() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  ASSERT(destroyed_);}TransportChannelImpl* Transport::CreateChannel(const std::string& name, const std::string &session_type) {  ChannelParams params;  params.name = name;  params.session_type = session_type;  ChannelMessage msg(&params);  session_manager_->worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);  return msg.data()->channel;}TransportChannelImpl* Transport::CreateChannel_w(const std::string& name, const std::string &session_type) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  TransportChannelImpl* impl = CreateTransportChannel(name, session_type);  impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);  impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);  impl->SignalRequestSignaling.connect(      this, &Transport::OnChannelRequestSignaling);  impl->SignalChannelMessage.connect(this, &Transport::OnChannelMessage);  talk_base::CritScope cs(&crit_);  ASSERT(channels_.find(name) == channels_.end());  channels_[name] = impl;  destroyed_ = false;  if (connect_requested_) {    impl->Connect();    if (channels_.size() == 1) {      // If this is the first channel, then indicate that we have started      // connecting.      session_manager_->signaling_thread()->Post(this, MSG_CONNECTING, NULL);    }  }  return impl;}TransportChannelImpl* Transport::GetChannel(const std::string& name) {  talk_base::CritScope cs(&crit_);  ChannelMap::iterator iter = channels_.find(name);  return (iter != channels_.end()) ? iter->second : NULL;}bool Transport::HasChannels() {  talk_base::CritScope cs(&crit_);  return !channels_.empty();}void Transport::DestroyChannel(const std::string& name) {  ChannelParams params;  params.name = name;  ChannelMessage msg(&params);  session_manager_->worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);}void Transport::DestroyChannel_w(const std::string& name) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  TransportChannelImpl* impl = NULL;  {    talk_base::CritScope cs(&crit_);    ChannelMap::iterator iter = channels_.find(name);    ASSERT(iter != channels_.end());    impl = iter->second;    channels_.erase(iter);  }  if (connect_requested_ && channels_.empty()) {    // We're not longer attempting to connect.    session_manager_->signaling_thread()->Post(this, MSG_CONNECTING, NULL);  }  if (impl)    DestroyTransportChannel(impl);}void Transport::ConnectChannels() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  session_manager_->worker_thread()->Post(this, MSG_CONNECTCHANNELS, NULL);}void Transport::ConnectChannels_w() {  ASSERT(session_manager_->worker_thread()->IsCurrent());  if (connect_requested_)    return;  connect_requested_ = true;  session_manager_->signaling_thread()->Post(this, MSG_ONCHANNELMESSAGE, NULL);  CallChannels_w(&TransportChannelImpl::Connect);  if (!channels_.empty()) {    session_manager_->signaling_thread()->Post(this, MSG_CONNECTING, NULL);  }}void Transport::OnConnecting_s() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  SignalConnecting(this);} void Transport::DestroyAllChannels() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  session_manager_->worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);  destroyed_ = true;}void Transport::DestroyAllChannels_w() {  ASSERT(session_manager_->worker_thread()->IsCurrent());  std::vector<TransportChannelImpl*> impls;  {    talk_base::CritScope cs(&crit_);    for (ChannelMap::iterator iter = channels_.begin();         iter != channels_.end();         ++iter) {      impls.push_back(iter->second);    }    channels_.clear();  }  for (size_t i = 0; i < impls.size(); ++i)    DestroyTransportChannel(impls[i]);}void Transport::ResetChannels() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  session_manager_->worker_thread()->Post(this, MSG_RESETCHANNELS, NULL);}void Transport::ResetChannels_w() {  ASSERT(session_manager_->worker_thread()->IsCurrent());  // We are no longer attempting to connect  connect_requested_ = false;  // Clear out the old messages, they aren't relevant  talk_base::CritScope cs(&crit_);  for (size_t i=0; i<messages_.size(); ++i) {    delete messages_[i];  }  messages_.clear();  // Reset all of the channels  CallChannels_w(&TransportChannelImpl::Reset);}void Transport::OnSignalingReady() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  session_manager_->worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);  // Notify the subclass.  OnTransportSignalingReady();}void Transport::CallChannels_w(TransportChannelFunc func) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  talk_base::CritScope cs(&crit_);  for (ChannelMap::iterator iter = channels_.begin();       iter != channels_.end();       ++iter) {    ((iter->second)->*func)();  }}void Transport::ForwardChannelMessage(const std::string& name,                                      buzz::XmlElement* elem) {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  ASSERT(HasChannel(name));  ChannelParams* params = new ChannelParams();  params->name = name;  params->elem = elem;  ChannelMessage* msg = new ChannelMessage(params);  session_manager_->worker_thread()->Post(this, MSG_FORWARDCHANNELMESSAGE, msg);}void Transport::ForwardChannelMessage_w(const std::string& name,                                        buzz::XmlElement* elem) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  ChannelMap::iterator iter = channels_.find(name);  // It's ok for a channel to go away while this message is in transit.  if (iter != channels_.end()) {    iter->second->OnChannelMessage(elem);  }  delete elem;}void Transport::OnChannelReadableState(TransportChannel* channel) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  session_manager_->signaling_thread()->Post(this, MSG_READSTATE, NULL);}void Transport::OnChannelReadableState_s() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  bool readable = GetTransportState_s(true);  if (readable_ != readable) {    readable_ = readable;    SignalReadableState(this);  }}void Transport::OnChannelWritableState(TransportChannel* channel) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  session_manager_->signaling_thread()->Post(this, MSG_WRITESTATE, NULL);}void Transport::OnChannelWritableState_s() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  bool writable = GetTransportState_s(false);  if (writable_ != writable) {    writable_ = writable;    SignalWritableState(this);  }}bool Transport::GetTransportState_s(bool read) {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  bool result = false;  talk_base::CritScope cs(&crit_);  for (ChannelMap::iterator iter = channels_.begin();       iter != channels_.end();       ++iter) {    bool b = (read ? iter->second->readable() : iter->second->writable());    result = result || b;  }  return result;}void Transport::OnChannelRequestSignaling() {  ASSERT(session_manager_->worker_thread()->IsCurrent());  session_manager_->signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);}void Transport::OnChannelRequestSignaling_s() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  SignalRequestSignaling(this);}void Transport::OnChannelMessage(TransportChannelImpl* impl,                                 buzz::XmlElement* elem) {  ASSERT(session_manager_->worker_thread()->IsCurrent());  talk_base::CritScope cs(&crit_);  messages_.push_back(elem);  // We hold any messages until the client lets us connect.  if (connect_requested_) {    session_manager_->signaling_thread()->Post(        this, MSG_ONCHANNELMESSAGE, NULL);  }}void Transport::OnChannelMessage_s() {  ASSERT(session_manager_->signaling_thread()->IsCurrent());  ASSERT(connect_requested_);  std::vector<buzz::XmlElement*> msgs;  {    talk_base::CritScope cs(&crit_);    msgs.swap(messages_);  }  if (!msgs.empty())    OnTransportChannelMessages(msgs);}void Transport::OnTransportChannelMessages(    const std::vector<buzz::XmlElement*>& msgs) {  std::vector<buzz::XmlElement*> elems;  for (size_t i = 0; i < msgs.size(); ++i) {    buzz::XmlElement* elem =        new buzz::XmlElement(buzz::QName(name(), "transport"));    elem->AddElement(msgs[i]);    elems.push_back(elem);  }  SignalTransportMessage(this, elems);}void Transport::OnMessage(talk_base::Message* msg) {  switch (msg->message_id) {  case MSG_CREATECHANNEL:    {      ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();      params->channel = CreateChannel_w(params->name, params->session_type);    }    break;  case MSG_DESTROYCHANNEL:    {      ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();      DestroyChannel_w(params->name);    }    break;  case MSG_CONNECTCHANNELS:    ConnectChannels_w();    break;  case MSG_RESETCHANNELS:    ResetChannels_w();    break;  case MSG_DESTROYALLCHANNELS:    DestroyAllChannels_w();    break;  case MSG_ONSIGNALINGREADY:    CallChannels_w(&TransportChannelImpl::OnSignalingReady);    break;  case MSG_FORWARDCHANNELMESSAGE:    {      ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();      ForwardChannelMessage_w(params->name, params->elem);      delete params;    }    break;  case MSG_CONNECTING:    OnConnecting_s();    break;  case MSG_READSTATE:    OnChannelReadableState_s();    break;  case MSG_WRITESTATE:    OnChannelWritableState_s();    break;  case MSG_REQUESTSIGNALING:    OnChannelRequestSignaling_s();    break;  case MSG_ONCHANNELMESSAGE:    OnChannelMessage_s();    break;  }}bool Transport::BadRequest(const buzz::XmlElement* stanza,                           const std::string& text,                           const buzz::XmlElement* extra_info) {  SignalTransportError(this, stanza, buzz::QN_STANZA_BAD_REQUEST, "modify",                       text, extra_info);  return false;}bool Transport::ParseAddress(const buzz::XmlElement* stanza,                             const buzz::XmlElement* elem,                             talk_base::SocketAddress* address) {  ASSERT(elem->HasAttr(QN_ADDRESS));  ASSERT(elem->HasAttr(QN_PORT));  // Record the parts of the address.  address->SetIP(elem->Attr(QN_ADDRESS));  std::istringstream ist(elem->Attr(QN_PORT));  int port;  ist >> port;  address->SetPort(port);   // No address zero.   if (address->IsAny())    return BadRequest(stanza, "candidate has address of zero", NULL);    // Always disallow addresses that refer to the local host.  if (address->IsLocalIP() && !allow_local_ips_)    return BadRequest(stanza, "candidate has local IP address", NULL);    // Disallow all ports below 1024, except for 80 and 443 on public addresses.  if (port < 1024) {     if ((port != 80) && (port != 443))      return BadRequest(stanza,                        "candidate has port below 1024, but not 80 or 443",                        NULL);    if (address->IsPrivateIP()) {      return BadRequest(stanza, "candidate has port of 80 or 443 with private "                                "IP address", NULL);    }  }  return true;}}  // namespace cricket

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -