📄 sockagg.cxx
字号:
/*
* sockagg.cxx
*
* Generalised Socket Aggregation functions
*
* Portable Windows Library
*
* Copyright (C) 2005 Post Increment
*
* The contents of this file are subject to the Mozilla Public License
* Version 1.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is Portable Windows Library.
*
* The Initial Developer of the Original Code is Post Increment
*
* Portions of this code were written with the financial assistance of
* Metreos Corporation (http://www.metros.com).
*
* Contributor(s): ______________________________________.
*
* $Log: sockagg.cxx,v $
* Revision 1.1 2006/06/29 04:18:04 joegenbaclor
* *** empty log message ***
*
* Revision 1.17 2006/03/13 23:34:21 csoutheren
* Added log message when handle creates aggregator
*
* Revision 1.16 2006/03/09 05:32:59 csoutheren
* Reverted to conservative locking strategy, with OnClose
*
* Revision 1.15 2006/03/07 11:04:56 csoutheren
* Ensure socket aggregation not used on Linux
*
* Revision 1.14 2006/03/07 07:38:02 csoutheren
* Refine timing windows on socket handling and cleanup unused code
*
* Revision 1.13 2006/03/06 02:37:26 csoutheren
* Change handle locking to help prevent aggregation threads from hogging list
* access
*
* Revision 1.12 2006/03/02 07:50:38 csoutheren
* Cleanup unused code
* Add OnClose function
*
* Revision 1.11 2006/02/28 02:26:00 csoutheren
* Renamed variable to be not same as member
*
* Revision 1.10 2006/02/28 02:08:02 csoutheren
* Modified aggregation to load balance better
*
* Revision 1.9 2006/02/08 04:02:25 csoutheren
* Added ability to enable and disable socket aggregation
*
* Revision 1.8 2006/01/23 05:57:39 csoutheren
* More aggegator implementation
*
* Revision 1.7 2006/01/18 07:16:56 csoutheren
* Latest version of socket aggregation code
*
* Revision 1.6 2006/01/05 11:39:32 rjongbloed
* Fixed DevStudio warning
*
* Revision 1.5 2006/01/03 04:23:32 csoutheren
* Fixed Unix implementation
*
* Revision 1.4 2005/12/23 07:49:27 csoutheren
* Fixed Unix implementation
*
* Revision 1.3 2005/12/23 06:44:31 csoutheren
* Working implementation
*
* Revision 1.2 2005/12/22 07:27:36 csoutheren
* More implementation
*
* Revision 1.1 2005/12/22 03:55:52 csoutheren
* Added initial version of socket aggregation classes
*
*/
#ifdef __GNUC__
#pragma implementation "sockagg.h"
#endif
#include <ptlib.h>
#include <ptclib/sockagg.h>
#include <fcntl.h>
////////////////////////////////////////////////////////////////
#if _WIN32
class LocalEvent : public PHandleAggregator::EventBase
{
public:
LocalEvent()
{
event = CreateEvent(NULL, TRUE, FALSE,NULL);
PAssert(event != NULL, "CreateEvent failed");
}
~LocalEvent()
{ CloseHandle(event); }
PAggregatorFD::FD GetHandle()
{ return event; }
void Set()
{ SetEvent(event); }
void Reset()
{ ResetEvent(event); }
protected:
HANDLE event;
};
PAggregatorFD::PAggregatorFD(SOCKET v)
: socket(v)
{
fd = WSACreateEvent();
PAssert(WSAEventSelect(socket, fd, FD_READ | FD_CLOSE) == 0, "WSAEventSelect failed");
}
PAggregatorFD::~PAggregatorFD()
{
WSACloseEvent(fd);
}
bool PAggregatorFD::IsValid()
{
return socket != INVALID_SOCKET;
}
#else // #if _WIN32
class LocalEvent : public PHandleAggregator::EventBase
{
public:
LocalEvent()
{ ::pipe(fds); }
virtual ~LocalEvent()
{
close(fds[0]);
close(fds[1]);
}
PAggregatorFD::FD GetHandle()
{ return fds[0]; }
void Set()
{ char ch; write(fds[1], &ch, 1); }
void Reset()
{ char ch; read(fds[0], &ch, 1); }
protected:
int fds[2];
};
PAggregatorFD::PAggregatorFD(int v)
: fd(v)
{
}
PAggregatorFD::~PAggregatorFD()
{
}
bool PAggregatorFD::IsValid()
{
return fd >= 0;
}
#endif // #endif _WIN32
////////////////////////////////////////////////////////////////
PHandleAggregator::WorkerThreadBase::WorkerThreadBase(EventBase & _event)
: PThread(100, NoAutoDeleteThread, NormalPriority, "Aggregator:%0x"), event(_event), listChanged(TRUE), shutdown(FALSE)
{
}
class WorkerThread : public PHandleAggregator::WorkerThreadBase
{
public:
WorkerThread()
: WorkerThreadBase(localEvent)
{ }
~WorkerThread()
{
}
void Trigger()
{ localEvent.Set(); }
LocalEvent localEvent;
};
////////////////////////////////////////////////////////////////
PHandleAggregator::PHandleAggregator(unsigned _max)
: maxWorkerSize(_max)
{
}
BOOL PHandleAggregator::AddHandle(PAggregatedHandle * handle)
{
// perform the handle init function
if (!handle->Init())
return FALSE;
PWaitAndSignal m(listMutex);
// if the maximum number of worker threads has been reached, then
// use the worker thread with the minimum number of handles
if (workers.size() >= maxWorkerSize) {
WorkerList_t::iterator minWorker = workers.end();
size_t minSizeFound = 0x7ffff;
WorkerList_t::iterator r;
for (r = workers.begin(); r != workers.end(); ++r) {
WorkerThreadBase & worker = **r;
PWaitAndSignal m2(worker.workerMutex);
if (!worker.shutdown && (worker.handleList.size() <= minSizeFound)) {
minSizeFound = worker.handleList.size();
minWorker = r;
}
}
// add the worker to the thread
PAssert(minWorker != workers.end(), "could not find minimum worker");
WorkerThreadBase & worker = **minWorker;
PWaitAndSignal m2(worker.workerMutex);
worker.handleList.push_back(handle);
PTRACE(4, "SockAgg\tAdding handle " << (void *)handle << " to aggregator - " << worker.handleList.size() << " handles");
worker.listChanged = TRUE;
worker.Trigger();
return TRUE;
}
PTRACE(4, "SockAgg\tCreating new aggregator for " << (void *)handle);
// no worker threads usable, create a new one
WorkerThread * worker = new WorkerThread;
worker->handleList.push_back(handle);
worker->Resume();
workers.push_back(worker);
PTRACE(4, "SockAgg\tAdding handle " << (void *)handle << " to new aggregator");
return TRUE;
}
BOOL PHandleAggregator::RemoveHandle(PAggregatedHandle * handle)
{
listMutex.Wait();
// look for the thread containing the handle we need to delete
WorkerList_t::iterator r;
for (r = workers.begin(); r != workers.end(); ++r) {
WorkerThreadBase * worker = *r;
// lock the worker
worker->workerMutex.Wait();
PAggregatedHandleList_t & hList = worker->handleList;
// if handle is not in this thread, then continue searching
PAggregatedHandleList_t::iterator s = find(hList.begin(), hList.end(), handle);
if (s == hList.end()) {
worker->workerMutex.Signal();
continue;
}
PAssert(*s == handle, "Found handle is not correct!");
PAssert(!handle->beingProcessed, "trying to close handle that is in use");
// remove the handle from the worker's list of handles
worker->handleList.erase(s);
// do the de-init action
handle->DeInit();
// delete the handle if autodelete enabled
if (handle->autoDelete)
delete handle;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -