📄 win_iocp_io_service.hpp
字号:
//// win_iocp_io_service.hpp// ~~~~~~~~~~~~~~~~~~~~~~~//// Copyright (c) 2003-2007 Christopher M. Kohlhoff (chris at kohlhoff dot com)//// Distributed under the Boost Software License, Version 1.0. (See accompanying// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//#ifndef ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP#define ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP#if defined(_MSC_VER) && (_MSC_VER >= 1200)# pragma once#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)#include "asio/detail/push_options.hpp"#include "asio/detail/win_iocp_io_service_fwd.hpp"#if defined(ASIO_HAS_IOCP)#include "asio/detail/push_options.hpp"#include <limits>#include <boost/throw_exception.hpp>#include "asio/detail/pop_options.hpp"#include "asio/io_service.hpp"#include "asio/system_error.hpp"#include "asio/detail/call_stack.hpp"#include "asio/detail/handler_alloc_helpers.hpp"#include "asio/detail/handler_invoke_helpers.hpp"#include "asio/detail/service_base.hpp"#include "asio/detail/socket_types.hpp"#include "asio/detail/timer_queue.hpp"#include "asio/detail/win_iocp_operation.hpp"#include "asio/detail/mutex.hpp"namespace asio {namespace detail {class win_iocp_io_service : public asio::detail::service_base<win_iocp_io_service>{public: // Base class for all operations. typedef win_iocp_operation operation; // Constructor. win_iocp_io_service(asio::io_service& io_service) : asio::detail::service_base<win_iocp_io_service>(io_service), iocp_(), outstanding_work_(0), stopped_(0), shutdown_(0), timer_thread_(0), timer_interrupt_issued_(false) { } void init(size_t concurrency_hint) { iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0)))); if (!iocp_.handle) { DWORD last_error = ::GetLastError(); asio::system_error e( asio::error_code(last_error, asio::error::get_system_category()), "iocp"); boost::throw_exception(e); } } // Destroy all user-defined handler objects owned by the service. void shutdown_service() { ::InterlockedExchange(&shutdown_, 1); for (;;) { DWORD bytes_transferred = 0;#if (WINVER < 0x0500) DWORD completion_key = 0;#else DWORD_PTR completion_key = 0;#endif LPOVERLAPPED overlapped = 0; ::SetLastError(0); BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, 0); DWORD last_error = ::GetLastError(); if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT) break; if (overlapped) static_cast<operation*>(overlapped)->destroy(); } for (std::size_t i = 0; i < timer_queues_.size(); ++i) timer_queues_[i]->destroy_timers(); timer_queues_.clear(); } // Register a handle with the IO completion port. void register_handle(HANDLE handle) { ::CreateIoCompletionPort(handle, iocp_.handle, 0, 0); } // Run the event loop until stopped or no more work. size_t run(asio::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { ec = asio::error_code(); return 0; } call_stack<win_iocp_io_service>::context ctx(this); size_t n = 0; while (do_one(true, ec)) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; } // Run until stopped or one operation is performed. size_t run_one(asio::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { ec = asio::error_code(); return 0; } call_stack<win_iocp_io_service>::context ctx(this); return do_one(true, ec); } // Poll for operations without blocking. size_t poll(asio::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { ec = asio::error_code(); return 0; } call_stack<win_iocp_io_service>::context ctx(this); size_t n = 0; while (do_one(false, ec)) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; } // Poll for one operation without blocking. size_t poll_one(asio::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { ec = asio::error_code(); return 0; } call_stack<win_iocp_io_service>::context ctx(this); return do_one(false, ec); } // Stop the event processing loop. void stop() { if (::InterlockedExchange(&stopped_, 1) == 0) { if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0)) { DWORD last_error = ::GetLastError(); asio::system_error e( asio::error_code(last_error, asio::error::get_system_category()), "pqcs"); boost::throw_exception(e); } } } // Reset in preparation for a subsequent run invocation. void reset() { ::InterlockedExchange(&stopped_, 0); } // Notify that some work has started. void work_started() { ::InterlockedIncrement(&outstanding_work_); } // Notify that some work has finished. void work_finished() { if (::InterlockedDecrement(&outstanding_work_) == 0) stop(); } // Request invocation of the given handler. template <typename Handler> void dispatch(Handler handler) { if (call_stack<win_iocp_io_service>::contains(this)) asio_handler_invoke_helpers::invoke(handler, &handler); else post(handler); } // Request invocation of the given handler and return immediately. template <typename Handler> void post(Handler handler) { // If the service has been shut down we silently discard the handler. if (::InterlockedExchangeAdd(&shutdown_, 0) != 0) return; // Allocate and construct an operation to wrap the handler. typedef handler_operation<Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler); // Enqueue the operation on the I/O completion port. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get())) { DWORD last_error = ::GetLastError(); asio::system_error e( asio::error_code(last_error, asio::error::get_system_category()), "pqcs"); boost::throw_exception(e); } // Operation has been successfully posted. ptr.release(); } // Request invocation of the given OVERLAPPED-derived operation. void post_completion(win_iocp_operation* op, DWORD op_last_error, DWORD bytes_transferred) { // Enqueue the operation on the I/O completion port. if (!::PostQueuedCompletionStatus(iocp_.handle, bytes_transferred, op_last_error, op)) { DWORD last_error = ::GetLastError(); asio::system_error e( asio::error_code(last_error, asio::error::get_system_category()), "pqcs"); boost::throw_exception(e); } } // Add a new timer queue to the service. template <typename Time_Traits> void add_timer_queue(timer_queue<Time_Traits>& timer_queue) { asio::detail::mutex::scoped_lock lock(timer_mutex_); timer_queues_.push_back(&timer_queue); } // Remove a timer queue from the service. template <typename Time_Traits> void remove_timer_queue(timer_queue<Time_Traits>& timer_queue) { asio::detail::mutex::scoped_lock lock(timer_mutex_); for (std::size_t i = 0; i < timer_queues_.size(); ++i) { if (timer_queues_[i] == &timer_queue) { timer_queues_.erase(timer_queues_.begin() + i); return; } } } // Schedule a timer in the given timer queue to expire at the specified // absolute time. The handler object will be invoked when the timer expires. template <typename Time_Traits, typename Handler> void schedule_timer(timer_queue<Time_Traits>& timer_queue, const typename Time_Traits::time_type& time, Handler handler, void* token) { // If the service has been shut down we silently discard the timer. if (::InterlockedExchangeAdd(&shutdown_, 0) != 0) return; asio::detail::mutex::scoped_lock lock(timer_mutex_); if (timer_queue.enqueue_timer(time, handler, token)) { if (!timer_interrupt_issued_) { timer_interrupt_issued_ = true; lock.unlock(); ::PostQueuedCompletionStatus(iocp_.handle, 0, steal_timer_dispatching, 0); } } } // Cancel the timer associated with the given token. Returns the number of // handlers that have been posted or dispatched. template <typename Time_Traits> std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token) { // If the service has been shut down we silently ignore the cancellation. if (::InterlockedExchangeAdd(&shutdown_, 0) != 0) return 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -