📄 sockagg.cxx
字号:
// if the worker thread has enough handles to keep running, trigger it to update
if (worker->handleList.size() > 0) {
PTRACE(4, "SockAgg\tRemoved handle " << (void *)handle << " from aggregator - " << worker->handleList.size() << " handles remaining");
worker->listChanged = TRUE;
worker->Trigger();
worker->workerMutex.Signal();
listMutex.Signal();
return TRUE;
}
PTRACE(4, "SockAgg\tworker thread empty - closing down");
// remove the worker thread from the list of workers
workers.erase(r);
// shutdown the thread
worker->shutdown = TRUE;
worker->Trigger();
worker->workerMutex.Signal();
// unlock the list
listMutex.Signal();
// the worker is now finished
if (!worker->WaitForTermination(10000)) {
PTRACE(4, "SockAgg\tWorker did not terminate promptly");
}
delete worker;
return TRUE;
}
listMutex.Signal();
PAssertAlways("Cannot find aggregator handle");
return FALSE;
}
////////////////////////////////////////////////////////////////
typedef std::vector<PAggregatorFD::FD> fdList_t;
#ifdef _WIN32
#define FDLIST_SIZE WSA_MAXIMUM_WAIT_EVENTS
#else
#define FDLIST_SIZE 64
#endif
void PHandleAggregator::WorkerThreadBase::Main()
{
PTRACE(4, "SockAgg\taggregator started");
fdList_t fdList;
PAggregatorFDList_t aggregatorFdList;
typedef std::map<PAggregatorFD::FD, PAggregatedHandle *> PAggregatorFdToHandleMap_t;
PAggregatorFdToHandleMap_t aggregatorFdToHandleMap;
for (;;) {
// create the list of fds to wait on and find minimum timeout
PTimeInterval timeout(PMaxTimeInterval);
PAggregatedHandle * timeoutHandle = NULL;
#ifndef _WIN32
fd_set rfds;
FD_ZERO(&rfds);
int maxFd = 0;
#endif
{
PWaitAndSignal m(workerMutex);
// check for shutdown
if (shutdown)
break;
// if the list of handles has changed, clear the list of handles
if (listChanged) {
aggregatorFdList.erase (aggregatorFdList.begin(), aggregatorFdList.end());
aggregatorFdList.reserve (FDLIST_SIZE);
fdList.erase (fdList.begin(), fdList.end());
fdList.reserve (FDLIST_SIZE);
aggregatorFdToHandleMap.erase(aggregatorFdToHandleMap.begin(), aggregatorFdToHandleMap.end());
}
PAggregatedHandleList_t::iterator r;
for (r = handleList.begin(); r != handleList.end(); ++r) {
PAggregatedHandle * handle = *r;
if (handle->closed)
continue;
if (listChanged) {
PAggregatorFDList_t fds = handle->GetFDs();
PAggregatorFDList_t::iterator s;
for (s = fds.begin(); s != fds.end(); ++s) {
fdList.push_back ((*s)->fd);
aggregatorFdList.push_back((*s));
aggregatorFdToHandleMap.insert(PAggregatorFdToHandleMap_t::value_type((*s)->fd, handle));
}
}
if (!handle->IsPreReadDone()) {
handle->PreRead();
handle->SetPreReadDone();
}
PTimeInterval t = handle->GetTimeout();
if (t < timeout) {
timeout = t;
timeoutHandle = handle;
}
}
// add in the event fd
if (listChanged) {
fdList.push_back(event.GetHandle());
listChanged = FALSE;
}
#ifndef _WIN32
// create the list of FDs
fdList_t::iterator s;
for (s = fdList.begin(); s != fdList.end(); ++s) {
FD_SET(*s, &rfds);
maxFd = PMAX(maxFd, *s);
}
#endif
} // workerMutex goes out of scope
#ifdef _WIN32
DWORD nCount = fdList.size();
DWORD ret = WSAWaitForMultipleEvents(nCount,
&fdList[0],
false,
(timeout == PMaxTimeInterval) ? WSA_INFINITE : (DWORD)timeout.GetMilliSeconds(),
FALSE);
if (ret == WAIT_FAILED) {
DWORD err = WSAGetLastError();
PTRACE(1, "SockAgg\tWSAWaitForMultipleEvents error " << err);
}
{
PWaitAndSignal m(workerMutex);
// check for shutdown
if (shutdown)
break;
if (ret == WAIT_TIMEOUT) {
// make sure the handle did not disappear while we were waiting
PAggregatedHandleList_t::iterator s = find(handleList.begin(), handleList.end(), timeoutHandle);
if (s == handleList.end()) {
PTRACE(4, "SockAgg\tHandle was removed while waiting");
}
else {
PTime start;
timeoutHandle->beingProcessed = TRUE;
timeoutHandle->closed = !timeoutHandle->OnRead();
timeoutHandle->beingProcessed = FALSE;
unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
if (duration > 50) {
PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
}
if (!timeoutHandle->closed)
timeoutHandle->SetPreReadDone(FALSE);
}
}
else if (WAIT_OBJECT_0 <= ret && ret <= (WAIT_OBJECT_0 + nCount - 1)) {
DWORD index = ret - WAIT_OBJECT_0;
// if the event was triggered, redo the select
if (index == nCount-1) {
event.Reset();
continue;
}
PAggregatorFD * fd = aggregatorFdList[index];
PAssert(fdList[index] == fd->fd, "Mismatch in fd lists");
PAggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
if (r != aggregatorFdToHandleMap.end()) {
PAggregatedHandle * handle = r->second;
PAggregatedHandleList_t::iterator s = find(handleList.begin(), handleList.end(), handle);
if (s == handleList.end()) {
PTRACE(4, "SockAgg\tHandle was removed while waiting");
}
else {
WSANETWORKEVENTS events;
WSAEnumNetworkEvents(fd->socket, fd->fd, &events);
if (events.lNetworkEvents != 0) {
// check for read events first so we process any data that arrives before closing
if ((events.lNetworkEvents & FD_READ) != 0) {
PTime start;
handle->beingProcessed = TRUE;
handle->closed = !handle->OnRead();
handle->beingProcessed = FALSE;
unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
PTRACE_IF(4, duration > 5, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
}
// check for socket close
if ((events.lNetworkEvents & FD_CLOSE) != 0)
handle->closed = TRUE;
if (!handle->closed) {
// prepare for next read
handle->SetPreReadDone(FALSE);
} else {
handle->beingProcessed = TRUE;
handle->OnClose();
handle->beingProcessed = FALSE;
// make sure the list is refreshed without the closed socket
listChanged = TRUE;
}
}
}
}
}
} // workerMutex goes out of scope
#else
#error "aggregation not yet implemented on Unix"
#if 0
P_timeval pv = timeout;
int ret = ::select(maxFd+1, &rfds, NULL, NULL, pv);
if (ret < 0) {
PTRACE(1, "SockAgg\tSelect failed with error " << errno);
}
// loop again if nothing was ready
if (ret <= 0)
continue;
{
PWaitAndSignal m(workerMutex);
// check for shutdown
if (shutdown)
break;
if (ret == 0) {
PTime start;
BOOL closed = !timeoutHandle->OnRead();
unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
if (duration > 50) {
PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
}
if (!closed)
timeoutHandle->SetPreReadDone(FALSE);
}
// check the event first
else if (FD_ISSET(event.GetHandle(), &rfds)) {
event.Reset();
continue;
}
else {
PAggregatorFD * fd = aggregatorFdList[ret];
PAssert(fdList[ret] == fd->fd, "Mismatch in fd lists");
PAggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
if (r != aggregatorFdToHandleMap.end()) {
PAggregatedHandle * handle = r->second;
PTime start;
BOOL closed = !handle->OnRead();
unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
if (duration > 50) {
PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
}
if (!closed)
handle->SetPreReadDone(FALSE);
}
}
} // workerMutex goes out of scope
#endif // #if 0
#endif
}
PTRACE(4, "SockAgg\taggregator finished");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -