⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poolexecutor.cxx.svn-base

📁 絲路server源碼 Silk Road server source
💻 SVN-BASE
📖 第 1 页 / 共 2 页
字号:
     * - 'group' allows tasks to be grouped together so that lists of waiting
     *   threads can be managed.
     *
     * - 'generation' allows tasks to be interrupted  
     */
    class GroupedRunnable : public Runnable {

      Task _task;
      WaiterQueue& _queue;

      size_t _group;
      size_t _generation;

    public:

      GroupedRunnable(const Task& task, WaiterQueue& queue)
        : _task(task), _queue(queue) { 
        
        std::pair<size_t, size_t> pr( _queue.increment() );
    
        _group      = pr.first;
        _generation = pr.second;

      }

      size_t group() const {
        return _group;
      }

      size_t generation() const {
        return _generation;
      }

      void run() {

        try {

          _task->run();

        } catch(...) {

        }

        _queue.decrement( group() );

      }

    };

    typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask;

    /**
     *
     */
    class ExecutorImpl {
      
      typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
      typedef std::deque<ThreadImpl*> ThreadList;
      
      TaskQueue   _taskQueue;
      WaiterQueue _waitingQueue;

      ThreadList      _threads;
      volatile size_t _size;


    public:
      
      ExecutorImpl() : _size(0) {}


      void registerThread() {
        
        Guard<TaskQueue> g(_taskQueue);

        ThreadImpl* impl = ThreadImpl::current();
        _threads.push_back(impl);

        // current cancel if too many threads are being created
        if(_threads.size() > _size) 
          impl->cancel();

      }

      void unregisterThread() {

        Guard<TaskQueue> g(_taskQueue);
        std::remove(_threads.begin(), _threads.end(), ThreadImpl::current());

      }

      void execute(const Task& task) {

        // Wrap the task with a grouped task
        GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
 
        try {
          
          _taskQueue.add( ExecutorTask(runnable) );

        } catch(...) {

          // Incase the queue is canceled between the time the WaiterQueue is 
          // updated and the task is added to the TaskQueue
          _waitingQueue.decrement( runnable->group() );
          throw;

        }

      }

      void interrupt() {

        // Bump the generation number
        _waitingQueue.generation(true);

        Guard<TaskQueue> g(_taskQueue);
        
        // Interrupt all threads currently running, thier tasks would be
        // from an older generation
        for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
          (*i)->interrupt();
       
      }

      //! Adjust the number of desired workers and return the number of Threads needed
      size_t workers(size_t n) {
        
        Guard<TaskQueue> g(_taskQueue);

        size_t m = (_size < n) ? (n - _size) : 0;
        _size = n;
        
        return m;

      }
      
      size_t workers() {
        
        Guard<TaskQueue> g(_taskQueue);
        return _size;
        
      }
      
      ExecutorTask next() {
        
        ExecutorTask task;
        
        // Draw the task from the queue
        for(;;) {

          try { 

            task = _taskQueue.next();
            break;

          } catch(Interrupted_Exception&) {

            // Ignore interruption here, it can only come from
            // another thread interrupt()ing the executor. The
            // thread was interrupted in the hopes it was busy 
            // with a task

          }

        }
        
        // Interrupt the thread running the tasks when the generation
        // does not match the current generation
        if( task->generation() != _waitingQueue.generation() )
          ThreadImpl::current()->interrupt();

        // Otherwise, clear the interrupted status for the thread and
        // give it a clean slate to start with
        else
          ThreadImpl::current()->isInterrupted();

        return task;

      }

      bool isCanceled() {
        return _taskQueue.isCanceled();
      }

      void cancel() {
        _taskQueue.cancel();
      }

      bool wait(unsigned long timeout) {
        return _waitingQueue.wait(timeout);
      }

    };

    //! Executor job
    class Worker : public Runnable {

      CountedPtr< ExecutorImpl > _impl;
      
    public:
      
      //! Create a Worker that draws upon the given Queue
      Worker(const CountedPtr< ExecutorImpl >& impl) 
        : _impl(impl) { }
   
      //! Run until Thread or Queue are canceled
      void run() { 
        
        _impl->registerThread();
        
        // Run until the Queue is canceled
        while(!Thread::canceled()) {
          
          // Draw tasks from the queue
          ExecutorTask task( _impl->next() );
          task->run();
                    
        } 
        
        _impl->unregisterThread();
   
      }
      
    }; /* Worker */


    //! Helper
    class Shutdown : public Runnable {

      CountedPtr< ExecutorImpl > _impl;

    public:

      Shutdown(const CountedPtr< ExecutorImpl >& impl) 
        : _impl(impl) { }
      
      void run() {        
        _impl->cancel();
      }

    }; /* Shutdown */

  }

  PoolExecutor::PoolExecutor(size_t n)
    : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
   
    size(n);
    
    // Request cancelation when main() exits
    ThreadQueue::instance()->insertShutdownTask(_shutdown);

  }

  PoolExecutor::~PoolExecutor() { 

    try {
      
      /**
       * If the shutdown task for this executor has not already been
       * selected to run, then run it locally
       */
      if(ThreadQueue::instance()->removeShutdownTask(_shutdown)) 
        _shutdown->run();
        
    } catch(...) { }

  } 

  void PoolExecutor::interrupt() {
    _impl->interrupt();
  }

  void PoolExecutor::size(size_t n) {
    
    if(n < 1)
      throw InvalidOp_Exception();

    for(size_t m = _impl->workers(n); m > 0; --m)
      Thread t(new Worker(_impl));

  }

  size_t PoolExecutor::size() {
    return _impl->workers();
  }


  void PoolExecutor::execute(const Task& task) {

    // Enqueue the task, the Queue will reject it with a 
    // Cancelation_Exception if the Executor has been canceled
    _impl->execute(task); 

  }

  void PoolExecutor::cancel() {
    _impl->cancel(); 
  }

  bool PoolExecutor::isCanceled() {
    return _impl->isCanceled(); 
  }
 
  void PoolExecutor::wait() {    
    _impl->wait(0);
  }

  bool PoolExecutor::wait(unsigned long timeout) {
    return _impl->wait(timeout == 0 ? 1 : timeout); 
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -