⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 thread_pool.h

📁 一个线程池的源代码
💻 H
字号:
/**
 * \file		thread_pool.h
 * \author		Plugware Solutions.com, Ltd.
 * \date		
 * \brief		[brief here]
 *
 *              Copyright (c) 2003, 2004
 *              Plugware Solutions.com, Ltd.
 *
 *              Permission to use, copy, modify, distribute and sell 
 *              this software and its documentation for any purpose 
 *              is hereby granted without fee, provided that the above 
 *              copyright notice appear in all copies and the both that 
 *              copyright notice and this permission notice appear 
 *              in supporting documentation.  Plugware Solutions.com, Ltd. 
 *              makes no reresentations about the suitability of this 
 *              software for any purpose.  It is provided "as is" 
 *              without express or implied warranty.
 */

#ifndef _plugware_thread_pool_h
#define _plugware_thread_pool_h

#if (_WIN32_WINNT < 0x0500)
#error Requires Windows Server 2003, Windows 2000 Server, or Windows NT Server 3.5 and later.
#endif //_WIN32_WINNT < 0x0500

/*
**  Includes
*/  

#pragma warning( disable: 4702) //unreachable code
#include <list>
#include <vector>
#pragma warning( default: 4702) //unreachable code
#include <crtdbg.h>
#include "utility.h"

/*
**  Declarations
*/

namespace plugware {
namespace core 
{
    /**
    * \class        thread_pool
    * \author       Plugware Solutions.com, Ltd.
    * \date         
    * \brief        Context to execute stateful work in a controllable, concurrent fashion.
    *               
    *               On win32 platforms implemented using iocp.  Work may be queued against the
    *               thread pool and will execute when a thread is free.  The thread pool provides
    *               fined grained concurrency control: number of threads running, total threads and a
    *               global initialization object available per thread pool made available to worker threads
    *               and work units.
    *
    *               Two related classes interact with the thread pool: work units worker threads.
    *               Work units are stateful objects encapsulating work to be done; worker thread 
    *               encapsulate the process by which work units are executed.
    */
    template <class worker>
    class thread_pool : public singleton<thread_pool>
    {
    public:  // interface

        typedef typename worker::request_type request_type;
        typedef std::vector<HANDLE> thread_vector;
        thread_pool();
        ~thread_pool();
    
        enum { _shutdown_flag_ = -1 };    
        void shutdown(const DWORD dw_max_wait = INFINITE);
        void initialize(size_t n_running_threads = 0, size_t n_thread_count = 0, const DWORD dw_stack_size = 0);
        void associate_device(HANDLE h_device);
        void queue_request(request_type* p_request);
        BOOL get_queued_status(LPDWORD pdw_bytes, request_type** pp_request, LPOVERLAPPED* pp_overlapped, DWORD dw_wait);

        // accessors
        HANDLE get_completion_port() { return m_h_iocp; }
        
        // const accessors
        const size_t get_size() const;

    private:    // implementation

        // modifiers
        void create_iocp(const size_t n_running_threads);
        void create_threads(size_t n_thread_count, const size_t dw_stack_size);
        void flag_shutdown();
        void wait_for_threads_to_die(DWORD dw_max_wait);
   
        // accessors
        thread_vector& get_thread_vector() { return m_c_thread_vector; }
                 
        // const accessors
        const bool           shutdown_initiated() const { return m_b_shutdown_flag; }
        const thread_vector& get_thread_vector()  const { return m_c_thread_vector; }
        const bool           is_initialized()     const { return 0     != m_h_iocp &&
                                                                 false == get_thread_vector().empty(); }
        // static helpers
        static const size_t get_cpu_count();
        
    private:    // data
    
        bool          m_b_shutdown_flag;
        thread_vector m_c_thread_vector;
        HANDLE        m_h_iocp;
        
    }; // class thread_pool
    
    /**
    * \class        work_unit
    * \author       Plugware Solutions.com, Ltd.
    * \date         
    * \brief        Interface for stateful work executed by a worker thread within a thread pool.
    *
    *               The work unit encapsulates all state pertinent to a task, as well as the process
    *               that operates on that state.  work_unit is explicitly bound to worker_thread
    *               as request_type; work_unit is specialized for worker_thread.
    */
    struct work_unit : reference_counted
    {
        virtual void process() throw() = 0;    
    };

    /**
    * \class        chain
    * \author       Plugware Solutions.com, Ltd.
    * \date         
    * \brief        [brief here]
    *
    *               [details here]
    */
    struct chain 
    {
        struct data : work_unit, std::list<smart_pointer<work_unit> >
        {
            void process() throw();
        };
        
        chain();
        chain& operator,(work_unit* p_work);
        operator work_unit*() { return m_work.get(); }

        smart_pointer<data> m_work;
    }; // struct chain

    /**
    * \class        worker_thread 
    * \author       Plugware Solutions.com, Ltd.
    * \date         
    * \brief        Encapsulation of thread running in thread pool.
    *
    *               work_unit serves as default request_type.
    */
    struct worker_thread
    {
        typedef work_unit request_type;
        typedef thread_pool<worker_thread> thread_pool;
        void execute(request_type* p_request) const throw();
        static DWORD WINAPI thread_proc(LPVOID pv) throw();
        const DWORD thread_proc() const;
        static void prepare_request(request_type* p_request) { p_request->add_ref(); }
    
    private:    // implementation
        
        worker_thread();
    }; // struct worker_thread

    /*
    **  Implementation
    */

    ////////////////////////////////////////////////////////////////////////////////
    // thread_pool
    ////////////////////////////////////////////////////////////////////////////////
    template <class worker>
    thread_pool<worker>::thread_pool()
    : m_b_shutdown_flag(false), m_h_iocp(0)
    {
    }
    
    template <class worker>
    thread_pool<worker>::~thread_pool()
    {
#ifdef _DEBUG
        ::OutputDebugStringA("~thread_pool\n");
#endif //_DEBUG

        shutdown();
    }
    
    template <class worker>
    void thread_pool<worker>::initialize(size_t n_running_threads, size_t n_thread_count, const DWORD dw_stack_size)
    {
        // validate and assign defaults:
        // n_running_threads: cpu count
        // n_thread_count:    2 * n_running_threads
        if (true == is_initialized()) return;
        if (0 >= n_running_threads) n_running_threads = get_cpu_count();
        if (0 >= n_thread_count) n_thread_count = n_running_threads << 1;
        if (n_running_threads > n_thread_count) n_running_threads = n_thread_count;
        
        // store param, create iocp and threads
        create_iocp(n_running_threads);
        create_threads(n_thread_count, dw_stack_size);
    }
    
    template <class worker>
    void thread_pool<worker>::create_iocp(const size_t n_running_threads)
    {
        // validate, create and validate
        if (true == is_initialized()) return;
        
        //@@todo: migrate to thread traits
        m_h_iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, (DWORD)n_running_threads);
        if (0 == get_completion_port()) throw std::exception("CreateIoCompletionPort failed");
    }
    
    template <class worker>
    void thread_pool<worker>::create_threads(size_t n_thread_count, const size_t dw_stack_size)
    {
        // validate, reserve, create and store
        if (true == is_initialized()) return;
        get_thread_vector().reserve(n_thread_count);
        
        while (n_thread_count--)
        {
            //@@todo: migrate to thread traits
            //@@review: security, dw_thread_id, etc
            // create thread, validate and store
            DWORD  dw_thread_id = 0;
            HANDLE h_thread     = ::CreateThread(0, 
                                                 dw_stack_size, 
                                                 worker::thread_proc, 
                                                 0,
                                                 0,
                                                 &dw_thread_id);
                                                 
            if (0 == h_thread) throw std::exception("CreateThread failed");
            get_thread_vector().push_back(h_thread);
        }
    }
    
    template <class worker>
    void thread_pool<worker>::associate_device(HANDLE h_device)
    {
        // validate, associate and validate
        if (false == is_initialized()) throw std::exception("uninitialized");
        if (0 == ::CreateIoCompletionPort(h_device, get_completion_port(), 0, 0))
        {
            throw std::exception("CreateIoCompletionPort failed");
        }   
    }
    
    template <class worker>
    void thread_pool<worker>::queue_request(request_type* p_request)
    {
        // validate, if we are shutting down, bail
        _ASSERT(p_request);
        if (true == shutdown_initiated()) return;        
        worker::prepare_request(p_request);
        
        //@@todo: migrate to thread traits
        if (FALSE == ::PostQueuedCompletionStatus(get_completion_port(), 0, (ULONG_PTR)p_request, 0))
        {
            throw std::exception("PostQueuedCompletionStatus failed");
        }
    }

    template <class worker>
    BOOL thread_pool<worker>::get_queued_status(LPDWORD pdw_bytes, request_type** pp_request, LPOVERLAPPED* pp_overlapped, DWORD dw_wait)
    {
        if (false == is_initialized()) throw std::exception("uninitialized");
        return ::GetQueuedCompletionStatus(get_completion_port(), pdw_bytes, (PULONG_PTR)pp_request, pp_overlapped, dw_wait);
    }

    template <class worker>
    void thread_pool<worker>::shutdown(const DWORD dw_max_wait)
    {
        // validate
        if (false == is_initialized()) return;
        if (true == shutdown_initiated()) return;
        
        // flag, wait
        flag_shutdown();
        wait_for_threads_to_die(dw_max_wait);
        
        // close handles and clear
        for (thread_vector::iterator handle(get_thread_vector().begin());
             handle != get_thread_vector().end();
             *handle && ::CloseHandle(*handle), ++handle) {}
        
        get_thread_vector().clear();
        ::CloseHandle(m_h_iocp);
        m_h_iocp = 0;
    }
    
    template <class worker>
    void thread_pool<worker>::flag_shutdown()
    {
        // flag and queue
        m_b_shutdown_flag = true;
        size_t n_thread_count = get_thread_vector().size();
        
        //@@todo: migrate thread traits
        while (n_thread_count--) ::PostQueuedCompletionStatus(get_completion_port(),
                                                              0,
                                                              0,
                                                              (LPOVERLAPPED)_shutdown_flag_);
    }
    
    template <class worker>
    void thread_pool<worker>::wait_for_threads_to_die(const DWORD dw_max_wait)
    {
        ::WaitForMultipleObjects((DWORD)get_thread_vector().size(), 
                                 &get_thread_vector().at(0), 
                                 TRUE, 
                                 dw_max_wait);
    }
    
    template <class worker>
    const size_t thread_pool<worker>::get_cpu_count()
    {
        SYSTEM_INFO c_si = {0};
        ::GetSystemInfo(&c_si);
        return c_si.dwNumberOfProcessors;
    }
    
    ///////////////////////////////////////////////////////////////////////////////
    //  chain
    ///////////////////////////////////////////////////////////////////////////////
    inline chain::chain() 
    : m_work(new(data)) 
    {
    }
    
    inline chain& chain::operator,(work_unit* p_work)
    {
        _ASSERT(0 != p_work);
        _ASSERT(0 != m_work.get());
        m_work.get()->push_back(p_work);
        return *this;
    }

    ///////////////////////////////////////////////////////////////////////////////
    //  chain::data
    ///////////////////////////////////////////////////////////////////////////////
    inline void chain::data::process()
    {
        _ASSERT(size());
        _ASSERT(front().get());
        
        front().get()->process();
        pop_front();
        
        // if not empty, requeue
        if (true == empty()) return;
        thread_pool<worker_thread>::instance().queue_request(this);
    }
    
    ////////////////////////////////////////////////////////////////////////////////
    // worker_thread
    ////////////////////////////////////////////////////////////////////////////////
    inline worker_thread::worker_thread()
    {
    }
    
    inline DWORD WINAPI worker_thread::thread_proc(LPVOID) throw()
    {
        try
        {
            // return values:
            //  0: success
            // -1: failure
            return worker_thread().thread_proc();
        }
        catch (...) {}            
        return DWORD(-1);
    }
    
    inline const DWORD worker_thread::thread_proc() const
    {
        // declare
        DWORD         dw_bytes_transferred = 0;
        OVERLAPPED*   p_overlapped         = 0;
        request_type* p_request            = 0;
        
        // main loop
        for (;;)
        { 
            // get next packet, on failure assert and and continue
            if (FALSE == 
                thread_pool::instance().get_queued_status(
                    &dw_bytes_transferred,
                    &p_request,
                    &p_overlapped,
                    INFINITE))
            {
                // nothing to see, move along ...
                continue;
            } 

            // shutdown or process
            if (thread_pool::_shutdown_flag_ == (UINT_PTR)p_overlapped) break;

            // no io requests, please
            _ASSERT(0 == p_overlapped);
            execute(p_request);
        }
        
#ifdef _DEBUG    
        ::OutputDebugStringA("worker_thread: terminated\n");
#endif //_DEBUG

        return 0;
    }
    
    inline void worker_thread::execute(request_type* p_request) const throw()
    {
        // validate and process (io or logic)
        if (0 == p_request) return;
        p_request->process();
        p_request->release();
    }
} // namespace core
    
namespace global 
{
    typedef core::thread_pool<core::worker_thread> thread_pool;
} // namespace global
} // namespace plugware
#endif //_plugware_thread_pool_h

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -