📄 pseudotcpchannel.cc
字号:
/* * libjingle * Copyright 2004--2006, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */#include "talk/base/basictypes.h"#include "talk/base/common.h"#include "talk/base/logging.h"#include "talk/base/stringutils.h"#include "talk/p2p/base/transportchannel.h"#include "pseudotcpchannel.h"using namespace talk_base;namespace cricket {extern const talk_base::ConstantLabel SESSION_STATES[];// MSG_WK_* - worker thread messages// MSG_ST_* - stream thread messages// MSG_SI_* - signal thread messagesenum { MSG_WK_CLOCK = 1, MSG_WK_PURGE, MSG_ST_EVENT, MSG_SI_DESTROYCHANNEL, MSG_SI_DESTROY,};struct EventData : public MessageData { int event, error; EventData(int ev, int err = 0) : event(ev), error(err) { }};///////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel::InternalStream///////////////////////////////////////////////////////////////////////////////class PseudoTcpChannel::InternalStream : public StreamInterface {public: InternalStream(PseudoTcpChannel* parent); virtual ~InternalStream(); virtual StreamState GetState() const; virtual StreamResult Read(void* buffer, size_t buffer_len, size_t* read, int* error); virtual StreamResult Write(const void* data, size_t data_len, size_t* written, int* error); virtual void Close(); virtual bool GetSize(size_t* size) const { return false; } virtual bool ReserveSize(size_t size) { return true; } virtual bool Rewind() { return false; }private: // parent_ is accessed and modified exclusively on the event thread, to // avoid thread contention. This means that the PseudoTcpChannel cannot go // away until after it receives a Close() from TunnelStream. PseudoTcpChannel* parent_;};///////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel// Member object lifetime summaries:// session_ - passed in constructor, cleared when channel_ goes away.// channel_ - created in Connect, destroyed when session_ or tcp_ goes away.// tcp_ - created in Connect, destroyed when channel_ goes away, or connection// closes.// worker_thread_ - created when channel_ is created, purged when channel_ is// destroyed.// stream_ - created in GetStream, destroyed by owner at arbitrary time.// this - created in constructor, destroyed when worker_thread_ and stream_// are both gone./////////////////////////////////////////////////////////////////////////////////// Signal thread methods//PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session) : signal_thread_(session->session_manager()->signaling_thread()), worker_thread_(NULL), stream_thread_(stream_thread), session_(session), channel_(NULL), tcp_(NULL), stream_(NULL), stream_readable_(false), pending_read_event_(false), ready_to_connect_(false) { ASSERT(signal_thread_->IsCurrent());}PseudoTcpChannel::~PseudoTcpChannel() { ASSERT(signal_thread_->IsCurrent()); ASSERT(worker_thread_ == NULL); ASSERT(session_ == NULL); ASSERT(channel_ == NULL); ASSERT(stream_ == NULL); ASSERT(tcp_ == NULL);}bool PseudoTcpChannel::Connect(const std::string& channel_name) { ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); if (channel_) return false; ASSERT(session_ != NULL); worker_thread_ = session_->session_manager()->worker_thread(); channel_ = session_->CreateChannel(channel_name); channel_name_ = channel_name; channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1); channel_->SignalDestroyed.connect(this, &PseudoTcpChannel::OnChannelDestroyed); channel_->SignalWritableState.connect(this, &PseudoTcpChannel::OnChannelWritableState); channel_->SignalReadPacket.connect(this, &PseudoTcpChannel::OnChannelRead); channel_->SignalRouteChange.connect(this, &PseudoTcpChannel::OnChannelConnectionChanged); ASSERT(tcp_ == NULL); tcp_ = new PseudoTcp(this, 0); if (session_->initiator()) { // Since we may try several protocols and network adapters that won't work, // waiting until we get our first writable notification before initiating // TCP negotiation. ready_to_connect_ = true; } return true;}StreamInterface* PseudoTcpChannel::GetStream() { ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); ASSERT(NULL != session_); if (!stream_) stream_ = new PseudoTcpChannel::InternalStream(this); //TODO("should we disallow creation of new stream at some point?"); return stream_;}void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) { LOG_F(LS_INFO) << "(" << channel->name() << ")"; ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); ASSERT(channel == channel_); signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL); // When MSG_WK_PURGE is received, we know there will be no more messages from // the worker thread. worker_thread_->Clear(this, MSG_WK_CLOCK); worker_thread_->Post(this, MSG_WK_PURGE); session_ = NULL; channel_ = NULL; if ((stream_ != NULL) && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED))) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0)); if (tcp_) { tcp_->Close(true); AdjustClock(); }}//// Stream thread methods//StreamState PseudoTcpChannel::GetState() const { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); if (!tcp_) return SS_OPENING; switch (tcp_->State()) { case PseudoTcp::TCP_LISTEN: case PseudoTcp::TCP_SYN_SENT: case PseudoTcp::TCP_SYN_RECEIVED: return SS_OPENING; case PseudoTcp::TCP_ESTABLISHED: return SS_OPEN; case PseudoTcp::TCP_CLOSED: default: return SS_CLOSED; }}StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); if (!tcp_) return SR_BLOCK; stream_readable_ = false; int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len); //LOG_F(LS_VERBOSE) << "Recv returned: " << result; if (result > 0) { if (read) *read = result; // PseudoTcp doesn't currently support repeated Readable signals. Simulate // them here. stream_readable_ = true; if (!pending_read_event_) { pending_read_event_ = true; stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true); } return SR_SUCCESS; } else if (IsBlockingError(tcp_->GetError())) { return SR_BLOCK; } else { if (error) *error = tcp_->GetError(); return SR_ERROR; } AdjustClock();}StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len, size_t* written, int* error) { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); if (!tcp_) return SR_BLOCK; int result = tcp_->Send(static_cast<const char*>(data), data_len); //LOG_F(LS_VERBOSE) << "Send returned: " << result; if (result > 0) { if (written) *written = result; return SR_SUCCESS; } else if (IsBlockingError(tcp_->GetError())) { return SR_BLOCK; } else { if (error) *error = tcp_->GetError(); return SR_ERROR; } AdjustClock();}void PseudoTcpChannel::Close() { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); stream_ = NULL; // Clear out any pending event notifications stream_thread_->Clear(this, MSG_ST_EVENT); if (tcp_) { tcp_->Close(false); AdjustClock(); } else { CheckDestroy(); }}//// Worker thread methods//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -