📄 win_iocp_io_service.hpp
字号:
asio::detail::mutex::scoped_lock lock(timer_mutex_); std::size_t n = timer_queue.cancel_timer(token); if (n > 0 && !timer_interrupt_issued_) { timer_interrupt_issued_ = true; lock.unlock(); ::PostQueuedCompletionStatus(iocp_.handle, 0, steal_timer_dispatching, 0); } return n; }private: // Dequeues at most one operation from the I/O completion port, and then // executes it. Returns the number of operations that were dequeued (i.e. // either 0 or 1). size_t do_one(bool block, asio::error_code& ec) { long this_thread_id = static_cast<long>(::GetCurrentThreadId()); for (;;) { // Try to acquire responsibility for dispatching timers. bool dispatching_timers = (::InterlockedCompareExchange( &timer_thread_, this_thread_id, 0) == 0); // Calculate timeout for GetQueuedCompletionStatus call. DWORD timeout = max_timeout; if (dispatching_timers) { asio::detail::mutex::scoped_lock lock(timer_mutex_); timer_interrupt_issued_ = false; timeout = get_timeout(); } // Get the next operation from the queue. DWORD bytes_transferred = 0;#if (WINVER < 0x0500) DWORD completion_key = 0;#else DWORD_PTR completion_key = 0;#endif LPOVERLAPPED overlapped = 0; ::SetLastError(0); BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, block ? timeout : 0); DWORD last_error = ::GetLastError(); // Dispatch any pending timers. if (dispatching_timers) { asio::detail::mutex::scoped_lock lock(timer_mutex_); timer_queues_copy_ = timer_queues_; for (std::size_t i = 0; i < timer_queues_.size(); ++i) { timer_queues_[i]->dispatch_timers(); timer_queues_[i]->dispatch_cancellations(); timer_queues_[i]->cleanup_timers(); } } if (!ok && overlapped == 0) { if (block && last_error == WAIT_TIMEOUT) { // Relinquish responsibility for dispatching timers. if (dispatching_timers) { ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id); } continue; } // Transfer responsibility for dispatching timers to another thread. if (dispatching_timers && ::InterlockedCompareExchange( &timer_thread_, 0, this_thread_id) == this_thread_id) { ::PostQueuedCompletionStatus(iocp_.handle, 0, transfer_timer_dispatching, 0); } ec = asio::error_code(); return 0; } else if (overlapped) { // We may have been passed a last_error value in the completion_key. if (last_error == 0) { last_error = completion_key; } // Transfer responsibility for dispatching timers to another thread. if (dispatching_timers && ::InterlockedCompareExchange( &timer_thread_, 0, this_thread_id) == this_thread_id) { ::PostQueuedCompletionStatus(iocp_.handle, 0, transfer_timer_dispatching, 0); } // Ensure that the io_service does not exit due to running out of work // while we make the upcall. auto_work work(*this); // Dispatch the operation. operation* op = static_cast<operation*>(overlapped); op->do_completion(last_error, bytes_transferred); ec = asio::error_code(); return 1; } else if (completion_key == transfer_timer_dispatching) { // Woken up to try to acquire responsibility for dispatching timers. ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id); } else if (completion_key == steal_timer_dispatching) { // Woken up to steal responsibility for dispatching timers. ::InterlockedExchange(&timer_thread_, 0); } else { // The stopped_ flag is always checked to ensure that any leftover // interrupts from a previous run invocation are ignored. if (::InterlockedExchangeAdd(&stopped_, 0) != 0) { // Relinquish responsibility for dispatching timers. if (dispatching_timers) { ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id); } // Wake up next thread that is blocked on GetQueuedCompletionStatus. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0)) { DWORD last_error = ::GetLastError(); ec = asio::error_code(last_error, asio::error::get_system_category()); return 0; } ec = asio::error_code(); return 0; } } } } // Check if all timer queues are empty. bool all_timer_queues_are_empty() const { for (std::size_t i = 0; i < timer_queues_.size(); ++i) if (!timer_queues_[i]->empty()) return false; return true; } // Get the timeout value for the GetQueuedCompletionStatus call. The timeout // value is returned as a number of milliseconds. We will wait no longer than // 1000 milliseconds. DWORD get_timeout() { if (all_timer_queues_are_empty()) return max_timeout; boost::posix_time::time_duration minimum_wait_duration = boost::posix_time::milliseconds(max_timeout); for (std::size_t i = 0; i < timer_queues_.size(); ++i) { boost::posix_time::time_duration wait_duration = timer_queues_[i]->wait_duration(); if (wait_duration < minimum_wait_duration) minimum_wait_duration = wait_duration; } if (minimum_wait_duration > boost::posix_time::time_duration()) { int milliseconds = minimum_wait_duration.total_milliseconds(); return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1); } else { return 0; } } struct auto_work { auto_work(win_iocp_io_service& io_service) : io_service_(io_service) { io_service_.work_started(); } ~auto_work() { io_service_.work_finished(); } private: win_iocp_io_service& io_service_; }; template <typename Handler> struct handler_operation : public operation { handler_operation(win_iocp_io_service& io_service, Handler handler) : operation(&handler_operation<Handler>::do_completion_impl, &handler_operation<Handler>::destroy_impl), io_service_(io_service), handler_(handler) { io_service_.work_started(); } ~handler_operation() { io_service_.work_finished(); } private: // Prevent copying and assignment. handler_operation(const handler_operation&); void operator=(const handler_operation&); static void do_completion_impl(operation* op, DWORD, size_t) { // Take ownership of the operation object. typedef handler_operation<Handler> op_type; op_type* handler_op(static_cast<op_type*>(op)); typedef handler_alloc_traits<Handler, op_type> alloc_traits; handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); // Make a copy of the handler so that the memory can be deallocated before // the upcall is made. Handler handler(handler_op->handler_); // Free the memory associated with the handler. ptr.reset(); // Make the upcall. asio_handler_invoke_helpers::invoke(handler, &handler); } static void destroy_impl(operation* op) { // Take ownership of the operation object. typedef handler_operation<Handler> op_type; op_type* handler_op(static_cast<op_type*>(op)); typedef handler_alloc_traits<Handler, op_type> alloc_traits; handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); } win_iocp_io_service& io_service_; Handler handler_; }; // The IO completion port used for queueing operations. struct iocp_holder { HANDLE handle; iocp_holder() : handle(0) {} ~iocp_holder() { if (handle) ::CloseHandle(handle); } } iocp_; // The count of unfinished work. long outstanding_work_; // Flag to indicate whether the event loop has been stopped. long stopped_; // Flag to indicate whether the service has been shut down. long shutdown_; enum { // Maximum GetQueuedCompletionStatus timeout, in milliseconds. max_timeout = 1000, // Completion key value to indicate that responsibility for dispatching // timers is being cooperatively transferred from one thread to another. transfer_timer_dispatching = 1, // Completion key value to indicate that responsibility for dispatching // timers should be stolen from another thread. steal_timer_dispatching = 2 }; // The thread that's currently in charge of dispatching timers. long timer_thread_; // Mutex for protecting access to the timer queues. mutex timer_mutex_; // Whether a thread has been interrupted to process a new timeout. bool timer_interrupt_issued_; // The timer queues. std::vector<timer_queue_base*> timer_queues_; // A copy of the timer queues, used when dispatching, cancelling and cleaning // up timers. The copy is stored as a class data member to avoid unnecessary // memory allocation. std::vector<timer_queue_base*> timer_queues_copy_;};} // namespace detail} // namespace asio#endif // defined(ASIO_HAS_IOCP)#include "asio/detail/pop_options.hpp"#endif // ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -