⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sockagg.cxx

📁 sloedgy open sip stack source code
💻 CXX
📖 第 1 页 / 共 2 页
字号:
    // if the worker thread has enough handles to keep running, trigger it to update
    if (worker->handleList.size() > 0) {
      PTRACE(4, "SockAgg\tRemoved handle " << (void *)handle << " from aggregator - " << worker->handleList.size() << " handles remaining");
      worker->listChanged = TRUE;
      worker->Trigger();
      worker->workerMutex.Signal();

      listMutex.Signal();

      return TRUE;
    }

    PTRACE(4, "SockAgg\tworker thread empty - closing down");

    // remove the worker thread from the list of workers
    workers.erase(r);

    // shutdown the thread
    worker->shutdown = TRUE;
    worker->Trigger();
    worker->workerMutex.Signal();

    // unlock the list
    listMutex.Signal();

    // the worker is now finished
    if (!worker->WaitForTermination(10000)) {
      PTRACE(4, "SockAgg\tWorker did not terminate promptly");
    }

    delete worker;

    return TRUE;
  }

  listMutex.Signal();

  PAssertAlways("Cannot find aggregator handle");

  return FALSE;
}

////////////////////////////////////////////////////////////////

typedef std::vector<PAggregatorFD::FD> fdList_t;

#ifdef _WIN32
#define	FDLIST_SIZE	WSA_MAXIMUM_WAIT_EVENTS
#else
#define	FDLIST_SIZE	64
#endif

void PHandleAggregator::WorkerThreadBase::Main()
{
  PTRACE(4, "SockAgg\taggregator started");

  fdList_t                  fdList;
  PAggregatorFDList_t       aggregatorFdList;

  typedef std::map<PAggregatorFD::FD, PAggregatedHandle *> PAggregatorFdToHandleMap_t;
  PAggregatorFdToHandleMap_t aggregatorFdToHandleMap;

  for (;;) {

    // create the list of fds to wait on and find minimum timeout
    PTimeInterval timeout(PMaxTimeInterval);
    PAggregatedHandle * timeoutHandle = NULL;

#ifndef _WIN32
    fd_set rfds;
    FD_ZERO(&rfds);
    int maxFd = 0;
#endif

    {
      PWaitAndSignal m(workerMutex);

      // check for shutdown
      if (shutdown)
        break;

      // if the list of handles has changed, clear the list of handles
      if (listChanged) {
        aggregatorFdList.erase       (aggregatorFdList.begin(),      aggregatorFdList.end());
        aggregatorFdList.reserve     (FDLIST_SIZE);
        fdList.erase                 (fdList.begin(),                fdList.end());
        fdList.reserve               (FDLIST_SIZE);
        aggregatorFdToHandleMap.erase(aggregatorFdToHandleMap.begin(),         aggregatorFdToHandleMap.end());
      }

      PAggregatedHandleList_t::iterator r;
      for (r = handleList.begin(); r != handleList.end(); ++r) {
        PAggregatedHandle * handle = *r;

        if (handle->closed)
          continue;

        if (listChanged) {
          PAggregatorFDList_t fds = handle->GetFDs();
          PAggregatorFDList_t::iterator s;
          for (s = fds.begin(); s != fds.end(); ++s) {
            fdList.push_back          ((*s)->fd);
            aggregatorFdList.push_back((*s));
            aggregatorFdToHandleMap.insert(PAggregatorFdToHandleMap_t::value_type((*s)->fd, handle));
          }
        }

        if (!handle->IsPreReadDone()) {
          handle->PreRead();
          handle->SetPreReadDone();
        }

        PTimeInterval t = handle->GetTimeout();
        if (t < timeout) {
          timeout = t;
          timeoutHandle = handle;
        }
      }

      // add in the event fd
      if (listChanged) {
        fdList.push_back(event.GetHandle());
        listChanged = FALSE;
      }

#ifndef _WIN32
      // create the list of FDs
      fdList_t::iterator s;
      for (s = fdList.begin(); s != fdList.end(); ++s) {
        FD_SET(*s, &rfds);
        maxFd = PMAX(maxFd, *s);
      }
#endif
    } // workerMutex goes out of scope

#ifdef _WIN32
    DWORD nCount = fdList.size();
    DWORD ret = WSAWaitForMultipleEvents(nCount, 
                                         &fdList[0], 
                                         false, 
                                         (timeout == PMaxTimeInterval) ? WSA_INFINITE : (DWORD)timeout.GetMilliSeconds(), 
                                         FALSE);

    if (ret == WAIT_FAILED) {
      DWORD err = WSAGetLastError();
      PTRACE(1, "SockAgg\tWSAWaitForMultipleEvents error " << err);
    }

    {
      PWaitAndSignal m(workerMutex);

      // check for shutdown
      if (shutdown)
        break;

      if (ret == WAIT_TIMEOUT) {
        // make sure the handle did not disappear while we were waiting
        PAggregatedHandleList_t::iterator s = find(handleList.begin(), handleList.end(), timeoutHandle);
        if (s == handleList.end()) {
          PTRACE(4, "SockAgg\tHandle was removed while waiting");
        } 
        else {
          PTime start;

          timeoutHandle->beingProcessed = TRUE;
          timeoutHandle->closed = !timeoutHandle->OnRead();
          timeoutHandle->beingProcessed = FALSE;
  
          unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
          if (duration > 50) {
            PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
          }
          if (!timeoutHandle->closed)
            timeoutHandle->SetPreReadDone(FALSE);
        }
      }

      else if (WAIT_OBJECT_0 <= ret && ret <= (WAIT_OBJECT_0 + nCount - 1)) {
        DWORD index = ret - WAIT_OBJECT_0;

        // if the event was triggered, redo the select
        if (index == nCount-1) {
          event.Reset();
          continue;
        }

        PAggregatorFD * fd = aggregatorFdList[index];
        PAssert(fdList[index] == fd->fd, "Mismatch in fd lists");

        PAggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
        if (r != aggregatorFdToHandleMap.end()) {
          PAggregatedHandle * handle = r->second;
          PAggregatedHandleList_t::iterator s = find(handleList.begin(), handleList.end(), handle);
          if (s == handleList.end()) {
            PTRACE(4, "SockAgg\tHandle was removed while waiting");
          }
          else {
            WSANETWORKEVENTS events;
            WSAEnumNetworkEvents(fd->socket, fd->fd, &events);
            if (events.lNetworkEvents != 0) {

              // check for read events first so we process any data that arrives before closing
              if ((events.lNetworkEvents & FD_READ) != 0) {

                PTime start;

                handle->beingProcessed = TRUE;
                handle->closed = !handle->OnRead();
                handle->beingProcessed = FALSE;
    
                unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
                PTRACE_IF(4, duration > 5, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
              }

              // check for socket close
              if ((events.lNetworkEvents & FD_CLOSE) != 0)
                handle->closed = TRUE;

              if (!handle->closed) {
                // prepare for next read
                handle->SetPreReadDone(FALSE);
              } else {
                handle->beingProcessed = TRUE;
                handle->OnClose();
                handle->beingProcessed = FALSE;

                // make sure the list is refreshed without the closed socket
                listChanged = TRUE;
              }
            }
          }
        }
      }
    } // workerMutex goes out of scope

#else

#error "aggregation not yet implemented on Unix"

#if 0

    P_timeval pv = timeout;
    int ret = ::select(maxFd+1, &rfds, NULL, NULL, pv);

    if (ret < 0) {
      PTRACE(1, "SockAgg\tSelect failed with error " << errno);
    }

    // loop again if nothing was ready
    if (ret <= 0)
      continue;

    {
      PWaitAndSignal m(workerMutex);

      // check for shutdown
      if (shutdown)
        break;

      if (ret == 0) {
        PTime start;
        BOOL closed = !timeoutHandle->OnRead();
        unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
        if (duration > 50) {
          PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
        }
        if (!closed)
          timeoutHandle->SetPreReadDone(FALSE);
      }

      // check the event first
      else if (FD_ISSET(event.GetHandle(), &rfds)) {
        event.Reset();
        continue;
      }

      else {
        PAggregatorFD * fd = aggregatorFdList[ret];
        PAssert(fdList[ret] == fd->fd, "Mismatch in fd lists");
        PAggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
        if (r != aggregatorFdToHandleMap.end()) {
          PAggregatedHandle * handle = r->second;
          PTime start;
          BOOL closed = !handle->OnRead();
          unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
          if (duration > 50) {
            PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
          }
          if (!closed)
            handle->SetPreReadDone(FALSE);
        }
      }
    } // workerMutex goes out of scope
#endif // #if 0
#endif
  }

  PTRACE(4, "SockAgg\taggregator finished");
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -