📄 pseudotcpchannel.cc
字号:
void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(worker_thread_->IsCurrent()); CritScope lock(&cs_); if (!channel_) { LOG_F(LS_WARNING) << "NULL channel"; return; } ASSERT(channel == channel_); if (!tcp_) { LOG_F(LS_WARNING) << "NULL tcp"; return; } if (!ready_to_connect_ || !channel->writable()) return; ready_to_connect_ = false; tcp_->Connect(); AdjustClock();}void PseudoTcpChannel::OnChannelRead(TransportChannel* channel, const char* data, size_t size) { //LOG_F(LS_VERBOSE) << "(" << size << ")"; ASSERT(worker_thread_->IsCurrent()); CritScope lock(&cs_); if (!channel_) { LOG_F(LS_WARNING) << "NULL channel"; return; } ASSERT(channel == channel_); if (!tcp_) { LOG_F(LS_WARNING) << "NULL tcp"; return; } tcp_->NotifyPacket(data, size); AdjustClock();}void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel, const SocketAddress& addr) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(worker_thread_->IsCurrent()); CritScope lock(&cs_); if (!channel_) { LOG_F(LS_WARNING) << "NULL channel"; return; } ASSERT(channel == channel_); if (!tcp_) { LOG_F(LS_WARNING) << "NULL tcp"; return; } scoped_ptr<Socket> mtu_socket( worker_thread_->socketserver() ->CreateSocket(SOCK_DGRAM)); uint16 mtu = 65535; if (mtu_socket->Connect(addr) < 0) { LOG_F(LS_ERROR) << "Socket::Connect: " << mtu_socket->GetError(); } else if (mtu_socket->EstimateMTU(&mtu) < 0) { LOG_F(LS_ERROR) << "Socket::EstimateMTU: " << mtu_socket->GetError(); } else { tcp_->NotifyMTU(mtu); AdjustClock(); }}void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) { stream_readable_ = true; pending_read_event_ = true; stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_OPEN | SE_READ | SE_WRITE)); }}void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) { //LOG_F(LS_VERBOSE); ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) { stream_readable_ = true; if (!pending_read_event_) { pending_read_event_ = true; stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ)); } }}void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) { //LOG_F(LS_VERBOSE); ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));}void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));}//// Multi-thread methods//void PseudoTcpChannel::OnMessage(Message* pmsg) { if (pmsg->message_id == MSG_WK_CLOCK) { ASSERT(worker_thread_->IsCurrent()); //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)"; CritScope lock(&cs_); if (tcp_) { tcp_->NotifyClock(PseudoTcp::Now()); AdjustClock(false); } } else if (pmsg->message_id == MSG_WK_PURGE) { ASSERT(worker_thread_->IsCurrent()); LOG_F(LS_INFO) << "(MSG_WK_PURGE)"; // At this point, we know there are no additional worker thread messages. CritScope lock(&cs_); ASSERT(NULL == session_); ASSERT(NULL == channel_); worker_thread_ = NULL; CheckDestroy(); } else if (pmsg->message_id == MSG_ST_EVENT) { ASSERT(stream_thread_->IsCurrent()); //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, " // << data->event << ", " << data->error << ")"; ASSERT(stream_ != NULL); EventData* data = static_cast<EventData*>(pmsg->pdata); if (data->event & SE_READ) { CritScope lock(&cs_); pending_read_event_ = false; } stream_->SignalEvent(stream_, data->event, data->error); delete data; } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) { ASSERT(signal_thread_->IsCurrent()); LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)"; ASSERT(session_ != NULL); ASSERT(channel_ != NULL); session_->DestroyChannel(channel_); } else if (pmsg->message_id == MSG_SI_DESTROY) { ASSERT(signal_thread_->IsCurrent()); LOG_F(LS_INFO) << "(MSG_SI_DESTROY)"; // The message queue is empty, so it is safe to destroy ourselves. delete this; } else { ASSERT(false); }}IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket( PseudoTcp* tcp, const char* buffer, size_t len) { ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(tcp == tcp_); ASSERT(NULL != channel_); int sent = channel_->SendPacket(buffer, len); if (sent > 0) { //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent"; return IPseudoTcpNotify::WR_SUCCESS; } else if (IsBlockingError(channel_->GetError())) { LOG_F(LS_VERBOSE) << "Blocking"; return IPseudoTcpNotify::WR_SUCCESS; } else if (channel_->GetError() == EMSGSIZE) { LOG_F(LS_ERROR) << "EMSGSIZE"; return IPseudoTcpNotify::WR_TOO_LARGE; } else { PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket"; ASSERT(false); return IPseudoTcpNotify::WR_FAIL; }}void PseudoTcpChannel::AdjustClock(bool clear) { ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(NULL != tcp_); long timeout = 0; if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) { ASSERT(NULL != channel_); // Reset the next clock, by clearing the old and setting a new one. if (clear) worker_thread_->Clear(this, MSG_WK_CLOCK); worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK); return; } delete tcp_; tcp_ = NULL; ready_to_connect_ = false; if (channel_) { // If TCP has failed, no need for channel_ anymore signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL); }}void PseudoTcpChannel::CheckDestroy() { ASSERT(cs_.CurrentThreadIsOwner()); if ((worker_thread_ != NULL) || (stream_ != NULL)) return; signal_thread_->Post(this, MSG_SI_DESTROY);}///////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel::InternalStream///////////////////////////////////////////////////////////////////////////////PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent) : parent_(parent) {}PseudoTcpChannel::InternalStream::~InternalStream() { Close();}StreamState PseudoTcpChannel::InternalStream::GetState() const { if (!parent_) return SS_CLOSED; return parent_->GetState();}StreamResult PseudoTcpChannel::InternalStream::Read( void* buffer, size_t buffer_len, size_t* read, int* error) { if (!parent_) { if (error) *error = ENOTCONN; return SR_ERROR; } return parent_->Read(buffer, buffer_len, read, error);}StreamResult PseudoTcpChannel::InternalStream::Write( const void* data, size_t data_len, size_t* written, int* error) { if (!parent_) { if (error) *error = ENOTCONN; return SR_ERROR; } return parent_->Write(data, data_len, written, error);}void PseudoTcpChannel::InternalStream::Close() { if (!parent_) return; parent_->Close(); parent_ = NULL;}///////////////////////////////////////////////////////////////////////////////} // namespace cricket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -