📄 poolexecutor.cxx.svn-base
字号:
* - 'group' allows tasks to be grouped together so that lists of waiting
* threads can be managed.
*
* - 'generation' allows tasks to be interrupted
*/
class GroupedRunnable : public Runnable {
Task _task;
WaiterQueue& _queue;
size_t _group;
size_t _generation;
public:
GroupedRunnable(const Task& task, WaiterQueue& queue)
: _task(task), _queue(queue) {
std::pair<size_t, size_t> pr( _queue.increment() );
_group = pr.first;
_generation = pr.second;
}
size_t group() const {
return _group;
}
size_t generation() const {
return _generation;
}
void run() {
try {
_task->run();
} catch(...) {
}
_queue.decrement( group() );
}
};
typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask;
/**
*
*/
class ExecutorImpl {
typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
typedef std::deque<ThreadImpl*> ThreadList;
TaskQueue _taskQueue;
WaiterQueue _waitingQueue;
ThreadList _threads;
volatile size_t _size;
public:
ExecutorImpl() : _size(0) {}
void registerThread() {
Guard<TaskQueue> g(_taskQueue);
ThreadImpl* impl = ThreadImpl::current();
_threads.push_back(impl);
// current cancel if too many threads are being created
if(_threads.size() > _size)
impl->cancel();
}
void unregisterThread() {
Guard<TaskQueue> g(_taskQueue);
std::remove(_threads.begin(), _threads.end(), ThreadImpl::current());
}
void execute(const Task& task) {
// Wrap the task with a grouped task
GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
try {
_taskQueue.add( ExecutorTask(runnable) );
} catch(...) {
// Incase the queue is canceled between the time the WaiterQueue is
// updated and the task is added to the TaskQueue
_waitingQueue.decrement( runnable->group() );
throw;
}
}
void interrupt() {
// Bump the generation number
_waitingQueue.generation(true);
Guard<TaskQueue> g(_taskQueue);
// Interrupt all threads currently running, thier tasks would be
// from an older generation
for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
(*i)->interrupt();
}
//! Adjust the number of desired workers and return the number of Threads needed
size_t workers(size_t n) {
Guard<TaskQueue> g(_taskQueue);
size_t m = (_size < n) ? (n - _size) : 0;
_size = n;
return m;
}
size_t workers() {
Guard<TaskQueue> g(_taskQueue);
return _size;
}
ExecutorTask next() {
ExecutorTask task;
// Draw the task from the queue
for(;;) {
try {
task = _taskQueue.next();
break;
} catch(Interrupted_Exception&) {
// Ignore interruption here, it can only come from
// another thread interrupt()ing the executor. The
// thread was interrupted in the hopes it was busy
// with a task
}
}
// Interrupt the thread running the tasks when the generation
// does not match the current generation
if( task->generation() != _waitingQueue.generation() )
ThreadImpl::current()->interrupt();
// Otherwise, clear the interrupted status for the thread and
// give it a clean slate to start with
else
ThreadImpl::current()->isInterrupted();
return task;
}
bool isCanceled() {
return _taskQueue.isCanceled();
}
void cancel() {
_taskQueue.cancel();
}
bool wait(unsigned long timeout) {
return _waitingQueue.wait(timeout);
}
};
//! Executor job
class Worker : public Runnable {
CountedPtr< ExecutorImpl > _impl;
public:
//! Create a Worker that draws upon the given Queue
Worker(const CountedPtr< ExecutorImpl >& impl)
: _impl(impl) { }
//! Run until Thread or Queue are canceled
void run() {
_impl->registerThread();
// Run until the Queue is canceled
while(!Thread::canceled()) {
// Draw tasks from the queue
ExecutorTask task( _impl->next() );
task->run();
}
_impl->unregisterThread();
}
}; /* Worker */
//! Helper
class Shutdown : public Runnable {
CountedPtr< ExecutorImpl > _impl;
public:
Shutdown(const CountedPtr< ExecutorImpl >& impl)
: _impl(impl) { }
void run() {
_impl->cancel();
}
}; /* Shutdown */
}
PoolExecutor::PoolExecutor(size_t n)
: _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
size(n);
// Request cancelation when main() exits
ThreadQueue::instance()->insertShutdownTask(_shutdown);
}
PoolExecutor::~PoolExecutor() {
try {
/**
* If the shutdown task for this executor has not already been
* selected to run, then run it locally
*/
if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
_shutdown->run();
} catch(...) { }
}
void PoolExecutor::interrupt() {
_impl->interrupt();
}
void PoolExecutor::size(size_t n) {
if(n < 1)
throw InvalidOp_Exception();
for(size_t m = _impl->workers(n); m > 0; --m)
Thread t(new Worker(_impl));
}
size_t PoolExecutor::size() {
return _impl->workers();
}
void PoolExecutor::execute(const Task& task) {
// Enqueue the task, the Queue will reject it with a
// Cancelation_Exception if the Executor has been canceled
_impl->execute(task);
}
void PoolExecutor::cancel() {
_impl->cancel();
}
bool PoolExecutor::isCanceled() {
return _impl->isCanceled();
}
void PoolExecutor::wait() {
_impl->wait(0);
}
bool PoolExecutor::wait(unsigned long timeout) {
return _impl->wait(timeout == 0 ? 1 : timeout);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -