📄 physicalsocketserver.cxx
字号:
return 0; } virtual void OnPreEvent(uint32 ff) { WSAResetEvent(hev_); } virtual void OnEvent(uint32 ff, int err) { } virtual WSAEVENT GetWSAEvent() { return hev_; } virtual SOCKET GetSocket() { return INVALID_SOCKET; } virtual bool CheckSignalClose() { return false; }private: PhysicalSocketServer* ss_; WSAEVENT hev_;};class SocketDispatcher : public Dispatcher, public PhysicalSocket {public: static int next_id_; int id_; bool signal_close_; int signal_err_; SocketDispatcher(PhysicalSocketServer* ss) : PhysicalSocket(ss), id_(0), signal_close_(false) { } SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) : PhysicalSocket(ss, s), id_(0), signal_close_(false) { } virtual ~SocketDispatcher() { Close(); } bool Initialize() { assert(s_ != INVALID_SOCKET); // Must be a non-blocking u_long argp = 1; ioctlsocket(s_, FIONBIO, &argp); ss_->Add(this); return true; } virtual bool Create(int type) { // Create socket if (!PhysicalSocket::Create(type)) return false; if (!Initialize()) return false; do { id_ = ++next_id_; } while (id_ == 0); return true; } virtual int Close() { if (s_ == INVALID_SOCKET) return 0; id_ = 0; signal_close_ = false; ss_->Remove(this); return PhysicalSocket::Close(); } virtual uint32 GetRequestedEvents() { return enabled_events_; } virtual void OnPreEvent(uint32 ff) { if ((ff & kfConnect) != 0) state_ = CS_CONNECTED; } virtual void OnEvent(uint32 ff, int err) { int cache_id = id_; if ((ff & kfRead) != 0) { enabled_events_ &= ~kfRead; SignalReadEvent(this); } if (((ff & kfWrite) != 0) && (id_ == cache_id)) { enabled_events_ &= ~kfWrite; SignalWriteEvent(this); } if (((ff & kfConnect) != 0) && (id_ == cache_id)) { if (ff != kfConnect) enabled_events_ &= ~kfConnect; SignalConnectEvent(this); } if (((ff & kfClose) != 0) && (id_ == cache_id)) { signal_close_ = true; signal_err_ = err; } } virtual WSAEVENT GetWSAEvent() { return WSA_INVALID_EVENT; } virtual SOCKET GetSocket() { return s_; } virtual bool CheckSignalClose() { if (!signal_close_) return false; char ch; if (recv(s_, &ch, 1, MSG_PEEK) > 0) return false; signal_close_ = false; SignalCloseEvent(this, signal_err_); return true; }};int SocketDispatcher::next_id_ = 0;// Sets the value of a boolean value to false when signaled.class Signaler : public EventDispatcher {public: Signaler(PhysicalSocketServer* ss, bool* pf) : EventDispatcher(ss), pf_(pf) { } virtual ~Signaler() { } void OnEvent(uint32 ff, int err) { if (pf_) *pf_ = false; }private: bool *pf_;};PhysicalSocketServer::PhysicalSocketServer() : fWait_(false), last_tick_tracked_(0), last_tick_dispatch_count_(0) { signal_wakeup_ = new Signaler(this, &fWait_);}PhysicalSocketServer::~PhysicalSocketServer() { delete signal_wakeup_;}void PhysicalSocketServer::WakeUp() { signal_wakeup_->Signal();}Socket* PhysicalSocketServer::CreateSocket(int type) { PhysicalSocket* socket = new PhysicalSocket(this); if (socket->Create(type)) { return socket; } else { delete socket; return 0; }}AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { SocketDispatcher* dispatcher = new SocketDispatcher(this); if (dispatcher->Create(type)) { return dispatcher; } else { delete dispatcher; return 0; }}AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { SocketDispatcher* dispatcher = new SocketDispatcher(s, this); if (dispatcher->Initialize()) { return dispatcher; } else { delete dispatcher; return 0; }}void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { CritScope cs(&crit_); dispatchers_.push_back(pdispatcher);}void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { CritScope cs(&crit_); dispatchers_.erase(std::remove(dispatchers_.begin(), dispatchers_.end(), pdispatcher), dispatchers_.end());}bool PhysicalSocketServer::Wait(int cmsWait, bool process_io){ int cmsTotal = cmsWait; int cmsElapsed = 0; uint32 msStart = GetMillisecondCount();#if LOGGING if (last_tick_dispatch_count_ == 0) { last_tick_tracked_ = msStart; }#endif WSAEVENT socket_ev = WSACreateEvent(); fWait_ = true; while (fWait_) { std::vector<WSAEVENT> events; std::vector<Dispatcher *> event_owners; events.push_back(socket_ev); { CritScope cr(&crit_); for (size_t i = 0; i < dispatchers_.size(); ++i) { Dispatcher * disp = dispatchers_[i]; if (!process_io && (disp != signal_wakeup_)) continue; SOCKET s = disp->GetSocket(); if (disp->CheckSignalClose()) { // We just signalled close, don't poll this socket } else if (s != INVALID_SOCKET) { WSAEventSelect(s, events[0], FlagsToEvents(disp->GetRequestedEvents())); } else { events.push_back(disp->GetWSAEvent()); event_owners.push_back(disp); } } } // Which is shorter, the delay wait or the asked wait? int cmsNext; if (cmsWait == -1) { cmsNext = cmsWait; } else { cmsNext = cmsTotal - cmsElapsed; if (cmsNext < 0) cmsNext = 0; } // Wait for one of the events to signal DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0], false, cmsNext, false);#if 0 // LOGGING // we track this information purely for logging purposes. last_tick_dispatch_count_++; if (last_tick_dispatch_count_ >= 1000) { uint32 now = GetMillisecondCount(); LOG(INFO) << "PhysicalSocketServer took " << TimeDiff(now, last_tick_tracked_) << "ms for 1000 events"; // If we get more than 1000 events in a second, we are spinning badly // (normally it should take about 8-20 seconds). assert(TimeDiff(now, last_tick_tracked_) > 1000); last_tick_tracked_ = now; last_tick_dispatch_count_ = 0; }#endif // Failed? // todo: need a better strategy than this! if (dw == WSA_WAIT_FAILED) { int error = WSAGetLastError(); assert(false); WSACloseEvent(socket_ev); return false; } // Timeout? if (dw == WSA_WAIT_TIMEOUT) { WSACloseEvent(socket_ev); return true; } // Figure out which one it is and call it { CritScope cr(&crit_); int index = dw - WSA_WAIT_EVENT_0; if (index > 0) { --index; // The first event is the socket event event_owners[index]->OnPreEvent(0); event_owners[index]->OnEvent(0, 0); } else if (process_io) { for (size_t i = 0; i < dispatchers_.size(); ++i) { Dispatcher * disp = dispatchers_[i]; SOCKET s = disp->GetSocket(); if (s == INVALID_SOCKET) continue; WSANETWORKEVENTS wsaEvents; int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); if (err == 0) { /*#if LOGGING { if ((wsaEvents.lNetworkEvents & FD_READ) && wsaEvents.iErrorCode[FD_READ_BIT] != 0) { LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error " << wsaEvents.iErrorCode[FD_READ_BIT]; } if ((wsaEvents.lNetworkEvents & FD_WRITE) && wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) { LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error " << wsaEvents.iErrorCode[FD_WRITE_BIT]; } if ((wsaEvents.lNetworkEvents & FD_CONNECT) && wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) { LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error " << wsaEvents.iErrorCode[FD_CONNECT_BIT]; } if ((wsaEvents.lNetworkEvents & FD_ACCEPT) && wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) { LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error " << wsaEvents.iErrorCode[FD_ACCEPT_BIT]; } if ((wsaEvents.lNetworkEvents & FD_CLOSE) && wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) { LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error " << wsaEvents.iErrorCode[FD_CLOSE_BIT]; } }#endif
*/ uint32 ff = 0; int errcode = 0; if (wsaEvents.lNetworkEvents & FD_READ) ff |= kfRead; if (wsaEvents.lNetworkEvents & FD_WRITE) ff |= kfWrite; if (wsaEvents.lNetworkEvents & FD_CONNECT) { if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) { ff |= kfConnect; } else { // TODO: Decide whether we want to signal connect, but with an error code ff |= kfClose; errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT]; } } if (wsaEvents.lNetworkEvents & FD_ACCEPT) ff |= kfRead; if (wsaEvents.lNetworkEvents & FD_CLOSE) { ff |= kfClose; errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; } if (ff != 0) { disp->OnPreEvent(ff); disp->OnEvent(ff, errcode); } } } } // Reset the network event until new activity occurs WSAResetEvent(socket_ev); } // Break? if (!fWait_) break; cmsElapsed = GetMillisecondCount() - msStart; if (cmsWait != -1) { if (cmsElapsed >= cmsWait) break; } } // Done WSACloseEvent(socket_ev); return true;}} // namespace cricket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -