⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pseudotcpchannel.cc

📁 本人收集整理的一份c/c++跨平台网络库
💻 CC
📖 第 1 页 / 共 2 页
字号:
void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";  ASSERT(worker_thread_->IsCurrent());  CritScope lock(&cs_);  if (!channel_) {    LOG_F(LS_WARNING) << "NULL channel";    return;  }  ASSERT(channel == channel_);  if (!tcp_) {    LOG_F(LS_WARNING) << "NULL tcp";    return;  }  if (!ready_to_connect_ || !channel->writable())    return;  ready_to_connect_ = false;  tcp_->Connect();  AdjustClock();}void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,                                     const char* data, size_t size) {  //LOG_F(LS_VERBOSE) << "(" << size << ")";  ASSERT(worker_thread_->IsCurrent());  CritScope lock(&cs_);  if (!channel_) {    LOG_F(LS_WARNING) << "NULL channel";    return;  }  ASSERT(channel == channel_);  if (!tcp_) {    LOG_F(LS_WARNING) << "NULL tcp";    return;  }  tcp_->NotifyPacket(data, size);  AdjustClock();}void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,                                                  const SocketAddress& addr) {  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";  ASSERT(worker_thread_->IsCurrent());  CritScope lock(&cs_);  if (!channel_) {    LOG_F(LS_WARNING) << "NULL channel";    return;  }  ASSERT(channel == channel_);  if (!tcp_) {    LOG_F(LS_WARNING) << "NULL tcp";    return;  }  scoped_ptr<Socket> mtu_socket(    worker_thread_->socketserver()           ->CreateSocket(SOCK_DGRAM));  uint16 mtu = 65535;  if (mtu_socket->Connect(addr) < 0) {    LOG_F(LS_ERROR) << "Socket::Connect: " << mtu_socket->GetError();  } else if (mtu_socket->EstimateMTU(&mtu) < 0) {    LOG_F(LS_ERROR) << "Socket::EstimateMTU: " << mtu_socket->GetError();  } else {    tcp_->NotifyMTU(mtu);    AdjustClock();  }}void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";  ASSERT(cs_.CurrentThreadIsOwner());  ASSERT(worker_thread_->IsCurrent());  ASSERT(tcp == tcp_);  if (stream_) {    stream_readable_ = true;    pending_read_event_ = true;    stream_thread_->Post(this, MSG_ST_EVENT,                         new EventData(SE_OPEN | SE_READ | SE_WRITE));  }}void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {  //LOG_F(LS_VERBOSE);  ASSERT(cs_.CurrentThreadIsOwner());  ASSERT(worker_thread_->IsCurrent());  ASSERT(tcp == tcp_);  if (stream_) {    stream_readable_ = true;    if (!pending_read_event_) {      pending_read_event_ = true;      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));    }  }}void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {  //LOG_F(LS_VERBOSE);  ASSERT(cs_.CurrentThreadIsOwner());  ASSERT(worker_thread_->IsCurrent());  ASSERT(tcp == tcp_);  if (stream_)    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));}void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";  ASSERT(cs_.CurrentThreadIsOwner());  ASSERT(worker_thread_->IsCurrent());  ASSERT(tcp == tcp_);  if (stream_)    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));}//// Multi-thread methods//void PseudoTcpChannel::OnMessage(Message* pmsg) {  if (pmsg->message_id == MSG_WK_CLOCK) {    ASSERT(worker_thread_->IsCurrent());    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";    CritScope lock(&cs_);    if (tcp_) {      tcp_->NotifyClock(PseudoTcp::Now());      AdjustClock(false);    }  } else if (pmsg->message_id == MSG_WK_PURGE) {    ASSERT(worker_thread_->IsCurrent());    LOG_F(LS_INFO) << "(MSG_WK_PURGE)";    // At this point, we know there are no additional worker thread messages.    CritScope lock(&cs_);    ASSERT(NULL == session_);    ASSERT(NULL == channel_);    worker_thread_ = NULL;    CheckDestroy();  } else if (pmsg->message_id == MSG_ST_EVENT) {    ASSERT(stream_thread_->IsCurrent());    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "    //             << data->event << ", " << data->error << ")";    ASSERT(stream_ != NULL);    EventData* data = static_cast<EventData*>(pmsg->pdata);    if (data->event & SE_READ) {      CritScope lock(&cs_);      pending_read_event_ = false;    }    stream_->SignalEvent(stream_, data->event, data->error);    delete data;  } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {    ASSERT(signal_thread_->IsCurrent());    LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";    ASSERT(session_ != NULL);    ASSERT(channel_ != NULL);    session_->DestroyChannel(channel_);  } else if (pmsg->message_id == MSG_SI_DESTROY) {    ASSERT(signal_thread_->IsCurrent());    LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";    // The message queue is empty, so it is safe to destroy ourselves.    delete this;  } else {    ASSERT(false);  }}IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(    PseudoTcp* tcp, const char* buffer, size_t len) {  ASSERT(cs_.CurrentThreadIsOwner());  ASSERT(tcp == tcp_);  ASSERT(NULL != channel_);  int sent = channel_->SendPacket(buffer, len);  if (sent > 0) {    //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";    return IPseudoTcpNotify::WR_SUCCESS;  } else if (IsBlockingError(channel_->GetError())) {    LOG_F(LS_VERBOSE) << "Blocking";    return IPseudoTcpNotify::WR_SUCCESS;  } else if (channel_->GetError() == EMSGSIZE) {    LOG_F(LS_ERROR) << "EMSGSIZE";    return IPseudoTcpNotify::WR_TOO_LARGE;  } else {    PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";    ASSERT(false);    return IPseudoTcpNotify::WR_FAIL;  }}void PseudoTcpChannel::AdjustClock(bool clear) {  ASSERT(cs_.CurrentThreadIsOwner());  ASSERT(NULL != tcp_);  long timeout = 0;  if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {    ASSERT(NULL != channel_);    // Reset the next clock, by clearing the old and setting a new one.    if (clear)      worker_thread_->Clear(this, MSG_WK_CLOCK);    worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);    return;  }  delete tcp_;  tcp_ = NULL;  ready_to_connect_ = false;  if (channel_) {    // If TCP has failed, no need for channel_ anymore    signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);  }}void PseudoTcpChannel::CheckDestroy() {  ASSERT(cs_.CurrentThreadIsOwner());  if ((worker_thread_ != NULL) || (stream_ != NULL))    return;  signal_thread_->Post(this, MSG_SI_DESTROY);}///////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel::InternalStream///////////////////////////////////////////////////////////////////////////////PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)  : parent_(parent) {}PseudoTcpChannel::InternalStream::~InternalStream() {  Close();}StreamState PseudoTcpChannel::InternalStream::GetState() const {  if (!parent_)    return SS_CLOSED;  return parent_->GetState();}StreamResult PseudoTcpChannel::InternalStream::Read(    void* buffer, size_t buffer_len, size_t* read, int* error) {  if (!parent_) {    if (error)      *error = ENOTCONN;    return SR_ERROR;  }  return parent_->Read(buffer, buffer_len, read, error);}StreamResult PseudoTcpChannel::InternalStream::Write(    const void* data, size_t data_len,  size_t* written, int* error) {  if (!parent_) {    if (error)      *error = ENOTCONN;    return SR_ERROR;  }  return parent_->Write(data, data_len, written, error);}void PseudoTcpChannel::InternalStream::Close() {  if (!parent_)    return;  parent_->Close();   parent_ = NULL;}///////////////////////////////////////////////////////////////////////////////} // namespace cricket

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -