⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 physicalsocketserver.cc

📁 本人收集整理的一份c/c++跨平台网络库
💻 CC
📖 第 1 页 / 共 2 页
字号:
  int fd_;  int flags_;};AsyncFile* PhysicalSocketServer::CreateFile(int fd) {  return new FileDispatcher(fd, this);}#endif // POSIX#ifdef WIN32class Dispatcher {public:  virtual uint32 GetRequestedEvents() = 0;  virtual void OnPreEvent(uint32 ff) = 0;    virtual void OnEvent(uint32 ff, int err) = 0;  virtual WSAEVENT GetWSAEvent() = 0;  virtual SOCKET GetSocket() = 0;  virtual bool CheckSignalClose() = 0;};uint32 FlagsToEvents(uint32 events) {  uint32 ffFD = FD_CLOSE | FD_ACCEPT;  if (events & kfRead)    ffFD |= FD_READ;  if (events & kfWrite)    ffFD |= FD_WRITE;  if (events & kfConnect)    ffFD |= FD_CONNECT;  return ffFD;}class EventDispatcher : public Dispatcher {public:  EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {    if (hev_ = WSACreateEvent()) {      ss_->Add(this);    }  }  ~EventDispatcher() {    if (hev_ != NULL) {      ss_->Remove(this);      WSACloseEvent(hev_);      hev_ = NULL;    }  }    virtual void Signal() {    if (hev_ != NULL)      WSASetEvent(hev_);  }    virtual uint32 GetRequestedEvents() {    return 0;  }  virtual void OnPreEvent(uint32 ff) {    WSAResetEvent(hev_);  }  virtual void OnEvent(uint32 ff, int err) {  }  virtual WSAEVENT GetWSAEvent() {    return hev_;  }  virtual SOCKET GetSocket() {    return INVALID_SOCKET;  }  virtual bool CheckSignalClose() { return false; }private:  PhysicalSocketServer* ss_;  WSAEVENT hev_;};class SocketDispatcher : public Dispatcher, public PhysicalSocket {public:  static int next_id_;  int id_;  bool signal_close_;  int signal_err_;  SocketDispatcher(PhysicalSocketServer* ss) : PhysicalSocket(ss), id_(0), signal_close_(false) {  }  SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) : PhysicalSocket(ss, s), id_(0), signal_close_(false) {  }  virtual ~SocketDispatcher() {    Close();  }  bool Initialize() {    assert(s_ != INVALID_SOCKET);    // Must be a non-blocking    u_long argp = 1;    ioctlsocket(s_, FIONBIO, &argp);    ss_->Add(this);    return true;  }   virtual bool Create(int type) {    // Create socket    if (!PhysicalSocket::Create(type))      return false;    if (!Initialize())      return false;    do { id_ = ++next_id_; } while (id_ == 0);    return true;  }  virtual int Close() {    if (s_ == INVALID_SOCKET)      return 0;    id_ = 0;    signal_close_ = false;    ss_->Remove(this);    return PhysicalSocket::Close();  }  virtual uint32 GetRequestedEvents() {    return enabled_events_;  }  virtual void OnPreEvent(uint32 ff) {    if ((ff & kfConnect) != 0)      state_ = CS_CONNECTED;  }  virtual void OnEvent(uint32 ff, int err) {    int cache_id = id_;    if ((ff & kfRead) != 0) {      enabled_events_ &= ~kfRead;      SignalReadEvent(this);    }    if (((ff & kfWrite) != 0) && (id_ == cache_id)) {      enabled_events_ &= ~kfWrite;      SignalWriteEvent(this);    }    if (((ff & kfConnect) != 0) && (id_ == cache_id)) {      if (ff != kfConnect)        LOG(LS_VERBOSE) << "Signalled with kfConnect: " << ff;      enabled_events_ &= ~kfConnect;      SignalConnectEvent(this);    }    if (((ff & kfClose) != 0) && (id_ == cache_id)) {      //LOG(INFO) << "SOCK[" << static_cast<int>(s_) << "] OnClose() Error: " << err;      signal_close_ = true;      signal_err_ = err;    }  }  virtual WSAEVENT GetWSAEvent() {    return WSA_INVALID_EVENT;  }  virtual SOCKET GetSocket() {    return s_;  }  virtual bool CheckSignalClose() {    if (!signal_close_)      return false;    char ch;    if (recv(s_, &ch, 1, MSG_PEEK) > 0)      return false;    signal_close_ = false;    SignalCloseEvent(this, signal_err_);    return true;  }};int SocketDispatcher::next_id_ = 0;#endif // WIN32// Sets the value of a boolean value to false when signaled.class Signaler : public EventDispatcher {public:  Signaler(PhysicalSocketServer* ss, bool* pf)      : EventDispatcher(ss), pf_(pf) {  }  virtual ~Signaler() { }  void OnEvent(uint32 ff, int err) {    if (pf_)      *pf_ = false;  }private:  bool *pf_;};PhysicalSocketServer::PhysicalSocketServer() : fWait_(false),  last_tick_tracked_(0), last_tick_dispatch_count_(0) {  signal_wakeup_ = new Signaler(this, &fWait_);}PhysicalSocketServer::~PhysicalSocketServer() {  delete signal_wakeup_;  //  ASSERT(dispatchers_.empty());}void PhysicalSocketServer::WakeUp() {  signal_wakeup_->Signal();}Socket* PhysicalSocketServer::CreateSocket(int type) {  PhysicalSocket* socket = new PhysicalSocket(this);  if (socket->Create(type)) {    return socket;  } else {    delete socket;    return 0;  }}AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {  SocketDispatcher* dispatcher = new SocketDispatcher(this);  if (dispatcher->Create(type)) {    return dispatcher;  } else {    delete dispatcher;    return 0;  }}AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {  SocketDispatcher* dispatcher = new SocketDispatcher(s, this);  if (dispatcher->Initialize()) {    return dispatcher;  } else {    delete dispatcher;    return 0;  }}void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {  CritScope cs(&crit_);  dispatchers_.push_back(pdispatcher);}void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {  CritScope cs(&crit_);  dispatchers_.erase(std::remove(dispatchers_.begin(), dispatchers_.end(), pdispatcher), dispatchers_.end());}#ifdef POSIXbool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {  // Calculate timing information  struct timeval *ptvWait = NULL;  struct timeval tvWait;  struct timeval tvStop;  if (cmsWait != kForever) {    // Calculate wait timeval    tvWait.tv_sec = cmsWait / 1000;    tvWait.tv_usec = (cmsWait % 1000) * 1000;    ptvWait = &tvWait;    // Calculate when to return in a timeval    gettimeofday(&tvStop, NULL);    tvStop.tv_sec += tvWait.tv_sec;    tvStop.tv_usec += tvWait.tv_usec;    if (tvStop.tv_usec >= 1000000) {      tvStop.tv_usec -= 1000000;      tvStop.tv_sec += 1;    }  }  // Zero all fd_sets. Don't need to do this inside the loop since  // select() zeros the descriptors not signaled    fd_set fdsRead;  FD_ZERO(&fdsRead);  fd_set fdsWrite;  FD_ZERO(&fdsWrite);   fWait_ = true;  while (fWait_) {    int fdmax = -1;    {      CritScope cr(&crit_);      for (unsigned i = 0; i < dispatchers_.size(); i++) {        // Query dispatchers for read and write wait state              Dispatcher *pdispatcher = dispatchers_[i];        assert(pdispatcher);        if (!process_io && (pdispatcher != signal_wakeup_))          continue;        int fd = pdispatcher->GetDescriptor();        if (fd > fdmax)          fdmax = fd;        uint32 ff = pdispatcher->GetRequestedEvents();        if (ff & kfRead)          FD_SET(fd, &fdsRead);        if (ff & (kfWrite | kfConnect))          FD_SET(fd, &fdsWrite);      }    }          // Wait then call handlers as appropriate    // < 0 means error    // 0 means timeout    // > 0 means count of descriptors ready    int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);    // If error, return error    // todo: do something intelligent    if (n < 0)      return false;        // If timeout, return success        if (n == 0)      return true;        // We have signaled descriptors       {      CritScope cr(&crit_);      for (unsigned i = 0; i < dispatchers_.size(); i++) {        Dispatcher *pdispatcher = dispatchers_[i];        int fd = pdispatcher->GetDescriptor();        uint32 ff = 0;        if (FD_ISSET(fd, &fdsRead)) {          FD_CLR(fd, &fdsRead);          ff |= kfRead;        }        if (FD_ISSET(fd, &fdsWrite)) {          FD_CLR(fd, &fdsWrite);          if (pdispatcher->GetRequestedEvents() & kfConnect) {            ff |= kfConnect;          } else {            ff |= kfWrite;          }        }        if (ff != 0) {          pdispatcher->OnPreEvent(ff);          pdispatcher->OnEvent(ff, 0);        }      }    }    // Recalc the time remaining to wait. Doing it here means it doesn't get    // calced twice the first time through the loop    if (cmsWait != kForever) {      ptvWait->tv_sec = 0;      ptvWait->tv_usec = 0;      struct timeval tvT;      gettimeofday(&tvT, NULL);      if (tvStop.tv_sec >= tvT.tv_sec) {        ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;        ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;        if (ptvWait->tv_usec < 0) {          ptvWait->tv_usec += 1000000;          ptvWait->tv_sec -= 1;        }      }    }  }          return true;}#endif // POSIX#ifdef WIN32bool PhysicalSocketServer::Wait(int cmsWait, bool process_io){  int cmsTotal = cmsWait;  int cmsElapsed = 0;  uint32 msStart = GetMillisecondCount();#if LOGGING  if (last_tick_dispatch_count_ == 0) {    last_tick_tracked_ = msStart;  }#endif  WSAEVENT socket_ev = WSACreateEvent();    fWait_ = true;  while (fWait_) {    std::vector<WSAEVENT> events;    std::vector<Dispatcher *> event_owners;    events.push_back(socket_ev);    {      CritScope cr(&crit_);      for (size_t i = 0; i < dispatchers_.size(); ++i) {        Dispatcher * disp = dispatchers_[i];        if (!process_io && (disp != signal_wakeup_))          continue;        SOCKET s = disp->GetSocket();        if (disp->CheckSignalClose()) {          // We just signalled close, don't poll this socket        } else if (s != INVALID_SOCKET) {          WSAEventSelect(s, events[0], FlagsToEvents(disp->GetRequestedEvents()));        } else {          events.push_back(disp->GetWSAEvent());          event_owners.push_back(disp);        }      }    }    // Which is shorter, the delay wait or the asked wait?    int cmsNext;    if (cmsWait == kForever) {      cmsNext = cmsWait;    } else {      cmsNext = cmsTotal - cmsElapsed;      if (cmsNext < 0)        cmsNext = 0;    }    // Wait for one of the events to signal    DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0], false, cmsNext, false);#if 0  // LOGGING    // we track this information purely for logging purposes.    last_tick_dispatch_count_++;    if (last_tick_dispatch_count_ >= 1000) {      uint32 now = GetMillisecondCount();      LOG(INFO) << "PhysicalSocketServer took " << TimeDiff(now, last_tick_tracked_) << "ms for 1000 events";      // If we get more than 1000 events in a second, we are spinning badly      // (normally it should take about 8-20 seconds).      assert(TimeDiff(now, last_tick_tracked_) > 1000);            last_tick_tracked_ = now;      last_tick_dispatch_count_ = 0;    }#endif    // Failed?    // todo: need a better strategy than this!    if (dw == WSA_WAIT_FAILED) {      int error = WSAGetLastError();      assert(false);      WSACloseEvent(socket_ev);      return false;    }    // Timeout?    if (dw == WSA_WAIT_TIMEOUT) {      WSACloseEvent(socket_ev);      return true;    }    // Figure out which one it is and call it    {      CritScope cr(&crit_);      int index = dw - WSA_WAIT_EVENT_0;      if (index > 0) {        --index; // The first event is the socket event        event_owners[index]->OnPreEvent(0);        event_owners[index]->OnEvent(0, 0);      } else if (process_io) {        for (size_t i = 0; i < dispatchers_.size(); ++i) {          Dispatcher * disp = dispatchers_[i];          SOCKET s = disp->GetSocket();          if (s == INVALID_SOCKET)            continue;          WSANETWORKEVENTS wsaEvents;          int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);          if (err == 0) {            #if LOGGING            {              if ((wsaEvents.lNetworkEvents & FD_READ) && wsaEvents.iErrorCode[FD_READ_BIT] != 0) {                LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error " << wsaEvents.iErrorCode[FD_READ_BIT];              }              if ((wsaEvents.lNetworkEvents & FD_WRITE) && wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {                LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error " << wsaEvents.iErrorCode[FD_WRITE_BIT];              }              if ((wsaEvents.lNetworkEvents & FD_CONNECT) && wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {                LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error " << wsaEvents.iErrorCode[FD_CONNECT_BIT];              }              if ((wsaEvents.lNetworkEvents & FD_ACCEPT) && wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {                LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error " << wsaEvents.iErrorCode[FD_ACCEPT_BIT];              }              if ((wsaEvents.lNetworkEvents & FD_CLOSE) && wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {                LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error " << wsaEvents.iErrorCode[FD_CLOSE_BIT];              }            }#endif            uint32 ff = 0;            int errcode = 0;            if (wsaEvents.lNetworkEvents & FD_READ)              ff |= kfRead;            if (wsaEvents.lNetworkEvents & FD_WRITE)              ff |= kfWrite;            if (wsaEvents.lNetworkEvents & FD_CONNECT) {              if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {                ff |= kfConnect;              } else {                // TODO: Decide whether we want to signal connect, but with an error code                ff |= kfClose;                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];              }            }            if (wsaEvents.lNetworkEvents & FD_ACCEPT)              ff |= kfRead;            if (wsaEvents.lNetworkEvents & FD_CLOSE) {              ff |= kfClose;              errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];            }            if (ff != 0) {              disp->OnPreEvent(ff);              disp->OnEvent(ff, errcode);            }          }        }      }      // Reset the network event until new activity occurs      WSAResetEvent(socket_ev);    }    // Break?    if (!fWait_)      break;    cmsElapsed = GetMillisecondCount() - msStart;    if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {       break;    }  }    // Done    WSACloseEvent(socket_ev);  return true;}#endif // WIN32} // namespace talk_base

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -