⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sockagg.cxx

📁 sloedgy open sip stack source code
💻 CXX
📖 第 1 页 / 共 2 页
字号:
/*
 * sockagg.cxx
 *
 * Generalised Socket Aggregation functions
 *
 * Portable Windows Library
 *
 * Copyright (C) 2005 Post Increment
 *
 * The contents of this file are subject to the Mozilla Public License
 * Version 1.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * The Original Code is Portable Windows Library.
 *
 * The Initial Developer of the Original Code is Post Increment
 *
 * Portions of this code were written with the financial assistance of 
 * Metreos Corporation (http://www.metros.com).
 *
 * Contributor(s): ______________________________________.
 *
 * $Log: sockagg.cxx,v $
 * Revision 1.1  2006/06/29 04:18:04  joegenbaclor
 * *** empty log message ***
 *
 * Revision 1.17  2006/03/13 23:34:21  csoutheren
 * Added log message when handle creates aggregator
 *
 * Revision 1.16  2006/03/09 05:32:59  csoutheren
 * Reverted to conservative locking strategy, with OnClose
 *
 * Revision 1.15  2006/03/07 11:04:56  csoutheren
 * Ensure socket aggregation not used on Linux
 *
 * Revision 1.14  2006/03/07 07:38:02  csoutheren
 * Refine timing windows on socket handling and cleanup unused code
 *
 * Revision 1.13  2006/03/06 02:37:26  csoutheren
 * Change handle locking to help prevent aggregation threads from hogging list
 *  access
 *
 * Revision 1.12  2006/03/02 07:50:38  csoutheren
 * Cleanup unused code
 * Add OnClose function
 *
 * Revision 1.11  2006/02/28 02:26:00  csoutheren
 * Renamed variable to be not same as member
 *
 * Revision 1.10  2006/02/28 02:08:02  csoutheren
 * Modified aggregation to load balance better
 *
 * Revision 1.9  2006/02/08 04:02:25  csoutheren
 * Added ability to enable and disable socket aggregation
 *
 * Revision 1.8  2006/01/23 05:57:39  csoutheren
 * More aggegator implementation
 *
 * Revision 1.7  2006/01/18 07:16:56  csoutheren
 * Latest version of socket aggregation code
 *
 * Revision 1.6  2006/01/05 11:39:32  rjongbloed
 * Fixed DevStudio warning
 *
 * Revision 1.5  2006/01/03 04:23:32  csoutheren
 * Fixed Unix implementation
 *
 * Revision 1.4  2005/12/23 07:49:27  csoutheren
 * Fixed Unix implementation
 *
 * Revision 1.3  2005/12/23 06:44:31  csoutheren
 * Working implementation
 *
 * Revision 1.2  2005/12/22 07:27:36  csoutheren
 * More implementation
 *
 * Revision 1.1  2005/12/22 03:55:52  csoutheren
 * Added initial version of socket aggregation classes
 *
 */

#ifdef __GNUC__
#pragma implementation "sockagg.h"
#endif


#include <ptlib.h>
#include <ptclib/sockagg.h>

#include <fcntl.h>

////////////////////////////////////////////////////////////////

#if _WIN32

class LocalEvent : public PHandleAggregator::EventBase
{
  public:
    LocalEvent()
    { 
      event = CreateEvent(NULL, TRUE, FALSE,NULL); 
      PAssert(event != NULL, "CreateEvent failed");
    }

    ~LocalEvent()
    { CloseHandle(event); }

    PAggregatorFD::FD GetHandle()
    { return event; }

    void Set()
    { SetEvent(event);  }

    void Reset()
    { ResetEvent(event); }

  protected:
    HANDLE event;
};

PAggregatorFD::PAggregatorFD(SOCKET v)
  : socket(v) 
{ 
  fd = WSACreateEvent(); 
  PAssert(WSAEventSelect(socket, fd, FD_READ | FD_CLOSE) == 0, "WSAEventSelect failed"); 
}

PAggregatorFD::~PAggregatorFD()
{ 
  WSACloseEvent(fd); 
}

bool PAggregatorFD::IsValid()
{ 
  return socket != INVALID_SOCKET; 
}

#else // #if _WIN32

class LocalEvent : public PHandleAggregator::EventBase
{
  public:
    LocalEvent()
    { ::pipe(fds); }

    virtual ~LocalEvent()
    {
      close(fds[0]);
      close(fds[1]);
    }

    PAggregatorFD::FD GetHandle()
    { return fds[0]; }

    void Set()
    { char ch; write(fds[1], &ch, 1); }

    void Reset()
    { char ch; read(fds[0], &ch, 1); }

  protected:
    int fds[2];
};

PAggregatorFD::PAggregatorFD(int v)
  : fd(v) 
{ 
}

PAggregatorFD::~PAggregatorFD()
{
}

bool PAggregatorFD::IsValid()
{ 
  return fd >= 0; 
}

#endif // #endif _WIN32
  

////////////////////////////////////////////////////////////////

PHandleAggregator::WorkerThreadBase::WorkerThreadBase(EventBase & _event)
  : PThread(100, NoAutoDeleteThread, NormalPriority, "Aggregator:%0x"), event(_event), listChanged(TRUE), shutdown(FALSE)
{ 
}

class WorkerThread : public PHandleAggregator::WorkerThreadBase
{
  public:
    WorkerThread()
      : WorkerThreadBase(localEvent)
    { }

    ~WorkerThread()
    {
    }

    void Trigger()
    { localEvent.Set(); }

    LocalEvent localEvent;
};


////////////////////////////////////////////////////////////////

PHandleAggregator::PHandleAggregator(unsigned _max)
  : maxWorkerSize(_max)
{ 
}

BOOL PHandleAggregator::AddHandle(PAggregatedHandle * handle)
{
  // perform the handle init function
  if (!handle->Init())
    return FALSE;

  PWaitAndSignal m(listMutex);

  // if the maximum number of worker threads has been reached, then
  // use the worker thread with the minimum number of handles
  if (workers.size() >= maxWorkerSize) {
    WorkerList_t::iterator minWorker = workers.end();
    size_t minSizeFound = 0x7ffff;
    WorkerList_t::iterator r;
    for (r = workers.begin(); r != workers.end(); ++r) {
      WorkerThreadBase & worker = **r;
      PWaitAndSignal m2(worker.workerMutex);
      if (!worker.shutdown && (worker.handleList.size() <= minSizeFound)) {
        minSizeFound = worker.handleList.size();
        minWorker     = r;
      }
    }

    // add the worker to the thread
    PAssert(minWorker != workers.end(), "could not find minimum worker");

    WorkerThreadBase & worker = **minWorker;
    PWaitAndSignal m2(worker.workerMutex);
    worker.handleList.push_back(handle);
    PTRACE(4, "SockAgg\tAdding handle " << (void *)handle << " to aggregator - " << worker.handleList.size() << " handles");
    worker.listChanged = TRUE;
    worker.Trigger();
    return TRUE;
  }

  PTRACE(4, "SockAgg\tCreating new aggregator for " << (void *)handle);

  // no worker threads usable, create a new one
  WorkerThread * worker = new WorkerThread;
  worker->handleList.push_back(handle);
  worker->Resume();
  workers.push_back(worker);

  PTRACE(4, "SockAgg\tAdding handle " << (void *)handle << " to new aggregator");

  return TRUE;
}

BOOL PHandleAggregator::RemoveHandle(PAggregatedHandle * handle)
{
  listMutex.Wait();

  // look for the thread containing the handle we need to delete
  WorkerList_t::iterator r;
  for (r = workers.begin(); r != workers.end(); ++r) {
    WorkerThreadBase * worker = *r;

    // lock the worker
    worker->workerMutex.Wait();

    PAggregatedHandleList_t & hList = worker->handleList;

    // if handle is not in this thread, then continue searching
    PAggregatedHandleList_t::iterator s = find(hList.begin(), hList.end(), handle);
    if (s == hList.end()) {
      worker->workerMutex.Signal();
      continue;
    }

    PAssert(*s == handle, "Found handle is not correct!");

    PAssert(!handle->beingProcessed, "trying to close handle that is in use");

    // remove the handle from the worker's list of handles
    worker->handleList.erase(s);

    // do the de-init action
    handle->DeInit();

    // delete the handle if autodelete enabled
    if (handle->autoDelete)
      delete handle;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -