📄 poolexecutor.cxx.svn-base
字号:
/*
* Copyright (c) 2005, Eric Crahen
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished
* to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
#include "ThreadImpl.h"
#include "zthread/PoolExecutor.h"
#include "zthread/MonitoredQueue.h"
#include "zthread/FastMutex.h"
#include "ThreadImpl.h"
#include "ThreadQueue.h"
#include <algorithm>
#include <deque>
#include <utility>
using namespace ZThread;
namespace ZThread {
namespace {
/**
*/
class WaiterQueue {
typedef std::deque<ThreadImpl*> ThreadList;
typedef struct group_t {
size_t id;
size_t count;
ThreadList waiters;
group_t(size_t n) : id(n), count(0) {}
} Group;
typedef std::deque<Group> GroupList;
//! Predicate to find a specific group
struct by_id : public std::unary_function<bool, Group> {
size_t id;
by_id(size_t n) : id(n) {}
bool operator()(const Group& grp) {
return grp.id == id;
}
};
//! Functor to count groups
struct counter : public std::unary_function<void, Group> {
size_t count;
counter() : count(0) {}
void operator()(const Group& grp) { count += grp.count; }
operator size_t() { return count; }
};
FastMutex _lock;
GroupList _list;
size_t _id;
size_t _generation;
public:
WaiterQueue() : _id(0), _generation(0) {
// At least one empty-group exists
_list.push_back( Group(_id++) );
}
/**
* Insert the current thread into the current waiter list
*
* @pre At least one empty group exists
* @post At least one empty group exists
*/
bool wait(unsigned long timeout) {
ThreadImpl* current = ThreadImpl::current();
Monitor& m = current->getMonitor();
Monitor::STATE state;
Guard<FastMutex> g1(_lock);
// At least one empty-group exists
assert(!_list.empty());
// Return w/o waiting if there are no executing tasks
if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
return true;
// Update the waiter list for the active group
_list.back().waiters.push_back(current);
size_t n = _list.back().id;
m.acquire();
{
Guard<FastMutex, UnlockedScope> g2(g1);
state = timeout == 0 ? m.wait() : m.wait(timeout);
}
m.release();
// If awoke due to a reason other than the last task in the group 'n' completing,
// then then find the group 'current' is waiting in
GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
if(i != _list.end()) {
// Remove 'current' from that list if it is still a member
ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), current);
if(j != i->waiters.end())
i->waiters.erase(j);
}
// At least one empty-group exists
assert(!_list.empty());
switch(state) {
case Monitor::SIGNALED:
break;
case Monitor::TIMEDOUT:
return false;
case Monitor::INTERRUPTED:
throw Interrupted_Exception();
default:
throw Synchronization_Exception();
}
return true;
}
/**
* Increment the active group count
*
* @pre at least 1 empty group exists
* @post at least 1 non-empty group exists
*/
std::pair<size_t, size_t> increment() {
Guard<FastMutex> g(_lock);
// At least one empty-group exists
assert(!_list.empty());
GroupList::iterator i = --_list.end();
size_t n = i->id;
if(i == _list.end()) {
// A group should never have been removed until
// the final task in that group completed
assert(0);
}
i->count++;
// When the active group is being incremented, insert a new active group
// to replace it if there were waiting threads
if(i == --_list.end() && !i->waiters.empty())
_list.push_back(Group(_id++));
// At least 1 non-empty group exists
assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
return std::make_pair(n, _generation);
}
/**
* Decrease the count for the group with the given id.
*
* @param n group id
*
* @pre At least 1 non-empty group exists
* @post At least 1 empty group exists
*/
void decrement(size_t n) {
Guard<FastMutex> g1(_lock);
// At least 1 non-empty group exists
assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
// Find the requested group
GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
if(i == _list.end()) {
// A group should never have been removed until
// the final task in that group completed
assert(0);
}
// Decrease the count for tasks in this group,
if(--i->count == 0 && i == _list.begin()) {
do {
// When the first group completes, wake all waiters for every
// group, starting from the first until a group that is not
// complete is reached
/*
// Don't remove the empty active group
if(i == --_list.end() && i->waiters.empty())
break;
*/
if( awaken(*i) ) {
// If all waiters were awakened, remove the group
i = _list.erase(i);
} else {
{
// Otherwise, unlock and yield allowing the waiter
// lists to be updated if other threads are busy
Guard<FastLock, UnlockedScope> g2(g1);
ThreadImpl::yield();
}
i = _list.begin();
}
} while(i != _list.end() && i->count == 0);
// Ensure that an active group exists
if(_list.empty())
_list.push_back( Group(++_id) );
}
// At least one group exists
assert(!_list.empty());
}
/**
*/
size_t generation(bool next = false) {
Guard<FastMutex> g(_lock);
return next ? _generation++ : _generation;
}
private:
/**
* Awaken all the waiters remaining in the given group
*
* @return
* - true if all the waiting threads were successfully awakened.
* - false if there were one or more threads that could not be awakened.
*/
bool awaken(Group& grp) {
// Go through the waiter list in the given group;
for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
ThreadImpl* impl = *i;
Monitor& m = impl->getMonitor();
// Try the monitor lock, if it cant be locked skip to the next waiter
if(m.tryAcquire()) {
// Notify the monitor & remove from the waiter list so time isn't
// wasted checking it again.
i = grp.waiters.erase(i);
// Try to wake the waiter, it doesn't matter if this is successful
// or not (only fails when the monitor is already going to stop waiting).
m.notify();
m.release();
} else ++i;
}
return grp.waiters.empty();
}
};
/**
* @class GroupedRunnable
*
* Wrap a task with group and generation information.
*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -