⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pseudotcpchannel.cc

📁 本人收集整理的一份c/c++跨平台网络库
💻 CC
📖 第 1 页 / 共 2 页
字号:
/* * libjingle * Copyright 2004--2006, Google Inc. * * Redistribution and use in source and binary forms, with or without  * modification, are permitted provided that the following conditions are met: * *  1. Redistributions of source code must retain the above copyright notice,  *     this list of conditions and the following disclaimer. *  2. Redistributions in binary form must reproduce the above copyright notice, *     this list of conditions and the following disclaimer in the documentation *     and/or other materials provided with the distribution. *  3. The name of the author may not be used to endorse or promote products  *     derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */#include "talk/base/basictypes.h"#include "talk/base/common.h"#include "talk/base/logging.h"#include "talk/base/stringutils.h"#include "talk/p2p/base/transportchannel.h"#include "pseudotcpchannel.h"using namespace talk_base;namespace cricket {extern const talk_base::ConstantLabel SESSION_STATES[];// MSG_WK_* - worker thread messages// MSG_ST_* - stream thread messages// MSG_SI_* - signal thread messagesenum {  MSG_WK_CLOCK = 1,  MSG_WK_PURGE,  MSG_ST_EVENT,  MSG_SI_DESTROYCHANNEL,  MSG_SI_DESTROY,};struct EventData : public MessageData {  int event, error;  EventData(int ev, int err = 0) : event(ev), error(err) { }};///////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel::InternalStream///////////////////////////////////////////////////////////////////////////////class PseudoTcpChannel::InternalStream : public StreamInterface {public:  InternalStream(PseudoTcpChannel* parent);  virtual ~InternalStream();  virtual StreamState GetState() const;  virtual StreamResult Read(void* buffer, size_t buffer_len,                                       size_t* read, int* error);  virtual StreamResult Write(const void* data, size_t data_len,                                        size_t* written, int* error);  virtual void Close();  virtual bool GetSize(size_t* size) const { return false; }  virtual bool ReserveSize(size_t size) { return true; }  virtual bool Rewind() { return false; }private:  // parent_ is accessed and modified exclusively on the event thread, to  // avoid thread contention.  This means that the PseudoTcpChannel cannot go  // away until after it receives a Close() from TunnelStream.  PseudoTcpChannel* parent_;};///////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel// Member object lifetime summaries://   session_ - passed in constructor, cleared when channel_ goes away.//   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.//   tcp_ - created in Connect, destroyed when channel_ goes away, or connection//     closes.//   worker_thread_ - created when channel_ is created, purged when channel_ is//     destroyed.//   stream_ - created in GetStream, destroyed by owner at arbitrary time.//   this - created in constructor, destroyed when worker_thread_ and stream_//     are both gone./////////////////////////////////////////////////////////////////////////////////// Signal thread methods//PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)  : signal_thread_(session->session_manager()->signaling_thread()),    worker_thread_(NULL),    stream_thread_(stream_thread),    session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),    stream_readable_(false), pending_read_event_(false),    ready_to_connect_(false) {  ASSERT(signal_thread_->IsCurrent());}PseudoTcpChannel::~PseudoTcpChannel() {  ASSERT(signal_thread_->IsCurrent());  ASSERT(worker_thread_ == NULL);  ASSERT(session_ == NULL);  ASSERT(channel_ == NULL);  ASSERT(stream_ == NULL);  ASSERT(tcp_ == NULL);}bool PseudoTcpChannel::Connect(const std::string& channel_name) {  ASSERT(signal_thread_->IsCurrent());  CritScope lock(&cs_);  if (channel_)    return false;  ASSERT(session_ != NULL);  worker_thread_ = session_->session_manager()->worker_thread();  channel_ = session_->CreateChannel(channel_name);  channel_name_ = channel_name;  channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);  channel_->SignalDestroyed.connect(this,    &PseudoTcpChannel::OnChannelDestroyed);  channel_->SignalWritableState.connect(this,    &PseudoTcpChannel::OnChannelWritableState);  channel_->SignalReadPacket.connect(this,    &PseudoTcpChannel::OnChannelRead);  channel_->SignalRouteChange.connect(this,    &PseudoTcpChannel::OnChannelConnectionChanged);  ASSERT(tcp_ == NULL);  tcp_ = new PseudoTcp(this, 0);  if (session_->initiator()) {    // Since we may try several protocols and network adapters that won't work,    // waiting until we get our first writable notification before initiating    // TCP negotiation.    ready_to_connect_ = true;  }  return true;}StreamInterface* PseudoTcpChannel::GetStream() {  ASSERT(signal_thread_->IsCurrent());  CritScope lock(&cs_);  ASSERT(NULL != session_);  if (!stream_)    stream_ = new PseudoTcpChannel::InternalStream(this);  //TODO("should we disallow creation of new stream at some point?");  return stream_;}void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {  LOG_F(LS_INFO) << "(" << channel->name() << ")";  ASSERT(signal_thread_->IsCurrent());  CritScope lock(&cs_);  ASSERT(channel == channel_);  signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);  // When MSG_WK_PURGE is received, we know there will be no more messages from  // the worker thread.  worker_thread_->Clear(this, MSG_WK_CLOCK);  worker_thread_->Post(this, MSG_WK_PURGE);  session_ = NULL;  channel_ = NULL;  if ((stream_ != NULL)      && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));  if (tcp_) {    tcp_->Close(true);    AdjustClock();  }}//// Stream thread methods//StreamState PseudoTcpChannel::GetState() const {  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());  CritScope lock(&cs_);  if (!tcp_)    return SS_OPENING;  switch (tcp_->State()) {    case PseudoTcp::TCP_LISTEN:    case PseudoTcp::TCP_SYN_SENT:    case PseudoTcp::TCP_SYN_RECEIVED:      return SS_OPENING;    case PseudoTcp::TCP_ESTABLISHED:      return SS_OPEN;    case PseudoTcp::TCP_CLOSED:    default:      return SS_CLOSED;  }}StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,                                    size_t* read, int* error) {  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());  CritScope lock(&cs_);  if (!tcp_)    return SR_BLOCK;  stream_readable_ = false;  int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);  //LOG_F(LS_VERBOSE) << "Recv returned: " << result;  if (result > 0) {    if (read)      *read = result;    // PseudoTcp doesn't currently support repeated Readable signals.  Simulate    // them here.    stream_readable_ = true;    if (!pending_read_event_) {      pending_read_event_ = true;      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);    }    return SR_SUCCESS;  } else if (IsBlockingError(tcp_->GetError())) {    return SR_BLOCK;  } else {    if (error)      *error = tcp_->GetError();    return SR_ERROR;  }  AdjustClock();}StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,                                     size_t* written, int* error) {  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());  CritScope lock(&cs_);  if (!tcp_)    return SR_BLOCK;  int result = tcp_->Send(static_cast<const char*>(data), data_len);  //LOG_F(LS_VERBOSE) << "Send returned: " << result;  if (result > 0) {    if (written)      *written = result;    return SR_SUCCESS;  } else if (IsBlockingError(tcp_->GetError())) {    return SR_BLOCK;  } else {    if (error)      *error = tcp_->GetError();    return SR_ERROR;  }  AdjustClock();}void PseudoTcpChannel::Close() {  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());  CritScope lock(&cs_);  stream_ = NULL;  // Clear out any pending event notifications  stream_thread_->Clear(this, MSG_ST_EVENT);  if (tcp_) {    tcp_->Close(false);    AdjustClock();  } else {    CheckDestroy();  }}//// Worker thread methods//

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -