📄 win_iocp_socket_service.hpp
字号:
// Need to set the SO_UPDATE_ACCEPT_CONTEXT option so that getsockname // and getpeername will work on the accepted socket. if (last_error == 0) { SOCKET update_ctx_param = handler_op->socket_; asio::error_code ec; if (socket_ops::setsockopt(handler_op->new_socket_.get(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &update_ctx_param, sizeof(SOCKET), ec) != 0) { last_error = ec.value(); } } // If the socket was successfully accepted, transfer ownership of the // socket to the peer object. if (last_error == 0) { asio::error_code ec; handler_op->peer_.assign(handler_op->protocol_, native_type(handler_op->new_socket_.get(), peer_endpoint), ec); if (ec) last_error = ec.value(); else handler_op->new_socket_.release(); } // Pass endpoint back to caller. if (handler_op->peer_endpoint_) *handler_op->peer_endpoint_ = peer_endpoint; // Make a copy of the handler so that the memory can be deallocated before // the upcall is made. Handler handler(handler_op->handler_); // Free the memory associated with the handler. ptr.reset(); // Call the handler. asio::error_code ec(last_error, asio::error::get_system_category()); asio_handler_invoke_helpers::invoke( detail::bind_handler(handler, ec), &handler); } static void destroy_impl(operation* op) { // Take ownership of the operation object. typedef accept_operation<Socket, Handler> op_type; op_type* handler_op(static_cast<op_type*>(op)); typedef handler_alloc_traits<Handler, op_type> alloc_traits; handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); } win_iocp_io_service& io_service_; socket_type socket_; socket_holder new_socket_; Socket& peer_; protocol_type protocol_; endpoint_type* peer_endpoint_; asio::io_service::work work_; unsigned char output_buffer_[(sizeof(sockaddr_storage_type) + 16) * 2]; bool enable_connection_aborted_; Handler handler_; }; // Start an asynchronous accept. The peer and peer_endpoint objects // must be valid until the accept's handler is invoked. template <typename Socket, typename Handler> void async_accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, Handler handler) { // Check whether acceptor has been initialised. if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, asio::error::bad_descriptor)); return; } // Check that peer socket has not already been opened. if (peer.is_open()) { this->get_io_service().post(bind_handler(handler, asio::error::already_open)); return; } // Update the ID of the thread from which cancellation is safe. if (impl.safe_cancellation_thread_id_ == 0) impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) impl.safe_cancellation_thread_id_ = ~DWORD(0); // Create a new socket for the connection. asio::error_code ec; socket_holder sock(socket_ops::socket(impl.protocol_.family(), impl.protocol_.type(), impl.protocol_.protocol(), ec)); if (sock.get() == invalid_socket) { this->get_io_service().post(bind_handler(handler, ec)); return; } // Allocate and construct an operation to wrap the handler. typedef accept_operation<Socket, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); socket_type new_socket = sock.get(); bool enable_connection_aborted = (impl.flags_ & implementation_type::enable_connection_aborted); handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, impl.socket_, new_socket, peer, impl.protocol_, peer_endpoint, enable_connection_aborted, handler); sock.release(); // Accept a connection. DWORD bytes_read = 0; BOOL result = ::AcceptEx(impl.socket_, ptr.get()->new_socket(), ptr.get()->output_buffer(), 0, ptr.get()->address_length(), ptr.get()->address_length(), &bytes_read, ptr.get()); DWORD last_error = ::WSAGetLastError(); // Check if the operation completed immediately. if (!result && last_error != WSA_IO_PENDING) { if (!enable_connection_aborted && (last_error == ERROR_NETNAME_DELETED || last_error == WSAECONNABORTED)) { // Post handler so that operation will be restarted again. We do not // perform the AcceptEx again here to avoid the possibility of starving // other handlers. iocp_service_.post_completion(ptr.get(), last_error, 0); ptr.release(); } else { asio::io_service::work work(this->get_io_service()); ptr.reset(); asio::error_code ec(last_error, asio::error::get_system_category()); iocp_service_.post(bind_handler(handler, ec)); } } else { ptr.release(); } } // Connect the socket to the specified endpoint. asio::error_code connect(implementation_type& impl, const endpoint_type& peer_endpoint, asio::error_code& ec) { if (!is_open(impl)) { ec = asio::error::bad_descriptor; return ec; } // Perform the connect operation. socket_ops::connect(impl.socket_, peer_endpoint.data(), peer_endpoint.size(), ec); return ec; } template <typename Handler> class connect_handler { public: connect_handler(socket_type socket, bool user_set_non_blocking, boost::shared_ptr<bool> completed, asio::io_service& io_service, reactor_type& reactor, Handler handler) : socket_(socket), user_set_non_blocking_(user_set_non_blocking), completed_(completed), io_service_(io_service), reactor_(reactor), work_(io_service), handler_(handler) { } bool operator()(const asio::error_code& result) { // Check whether a handler has already been called for the connection. // If it has, then we don't want to do anything in this handler. if (*completed_) return true; // Cancel the other reactor operation for the connection. *completed_ = true; reactor_.enqueue_cancel_ops_unlocked(socket_); // Check whether the operation was successful. if (result) { io_service_.post(bind_handler(handler_, result)); return true; } // Get the error code from the connect operation. int connect_error = 0; size_t connect_error_len = sizeof(connect_error); asio::error_code ec; if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &connect_error, &connect_error_len, ec) == socket_error_retval) { io_service_.post(bind_handler(handler_, ec)); return true; } // If connection failed then post the handler with the error code. if (connect_error) { ec = asio::error_code(connect_error, asio::error::get_system_category()); io_service_.post(bind_handler(handler_, ec)); return true; } // Revert socket to blocking mode unless the user requested otherwise. if (!user_set_non_blocking_) { ioctl_arg_type non_blocking = 0; if (socket_ops::ioctl(socket_, FIONBIO, &non_blocking, ec)) { io_service_.post(bind_handler(handler_, ec)); return true; } } // Post the result of the successful connection operation. ec = asio::error_code(); io_service_.post(bind_handler(handler_, ec)); return true; } private: socket_type socket_; bool user_set_non_blocking_; boost::shared_ptr<bool> completed_; asio::io_service& io_service_; reactor_type& reactor_; asio::io_service::work work_; Handler handler_; }; // Start an asynchronous connect. template <typename Handler> void async_connect(implementation_type& impl, const endpoint_type& peer_endpoint, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, asio::error::bad_descriptor)); return; } // Update the ID of the thread from which cancellation is safe. if (impl.safe_cancellation_thread_id_ == 0) impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) impl.safe_cancellation_thread_id_ = ~DWORD(0); // Check if the reactor was already obtained from the io_service. reactor_type* reactor = static_cast<reactor_type*>( interlocked_compare_exchange_pointer( reinterpret_cast<void**>(&reactor_), 0, 0)); if (!reactor) { reactor = &(asio::use_service<reactor_type>( this->get_io_service())); interlocked_exchange_pointer( reinterpret_cast<void**>(&reactor_), reactor); } // Mark the socket as non-blocking so that the connection will take place // asynchronously. ioctl_arg_type non_blocking = 1; asio::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec)); return; } // Start the connect operation. if (socket_ops::connect(impl.socket_, peer_endpoint.data(), peer_endpoint.size(), ec) == 0) { // Revert socket to blocking mode unless the user requested otherwise. if (!(impl.flags_ & implementation_type::user_set_non_blocking)) { non_blocking = 0; socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec); } // The connect operation has finished successfully so we need to post the // handler immediately. this->get_io_service().post(bind_handler(handler, ec)); } else if (ec == asio::error::in_progress || ec == asio::error::would_block) { // The connection is happening in the background, and we need to wait // until the socket becomes writeable. boost::shared_ptr<bool> completed(new bool(false)); reactor->start_write_and_except_ops(impl.socket_, connect_handler<Handler>( impl.socket_, (impl.flags_ & implementation_type::user_set_non_blocking) != 0, completed, this->get_io_service(), *reactor, handler)); } else { // Revert socket to blocking mode unless the user requested otherwise. if (!(impl.flags_ & implementation_type::user_set_non_blocking)) { non_blocking = 0; asio::error_code ignored_ec; socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec); } // The connect operation has failed, so post the handler immediately. this->get_io_service().post(bind_handler(handler, ec)); } }private: // Helper function to close a socket when the associated object is being // destroyed. void close_for_destruction(implementation_type& impl) { if (is_open(impl)) { // Check if the reactor was created, in which case we need to close the // socket on the reactor as well to cancel any operations that might be // running there. reactor_type* reactor = static_cast<reactor_type*>( interlocked_compare_exchange_pointer( reinterpret_cast<void**>(&reactor_), 0, 0)); if (reactor) reactor->close_descriptor(impl.socket_); // The socket destructor must not block. If the user has changed the // linger option to block in the foreground, we will change it back to the // default so that the closure is performed in the background. if (impl.flags_ & implementation_type::close_might_block) { ::linger opt; opt.l_onoff = 0; opt.l_linger = 0; asio::error_code ignored_ec; socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec); } asio::error_code ignored_ec; socket_ops::close(impl.socket_, ignored_ec); impl.socket_ = invalid_socket; impl.flags_ = 0; impl.cancel_token_.reset(); impl.safe_cancellation_thread_id_ = 0; } } // Helper function to emulate InterlockedCompareExchangePointer functionality // for: // - very old Platform SDKs; and // - platform SDKs where MSVC's /Wp64 option causes spurious warnings. void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp) {#if defined(_M_IX86) return reinterpret_cast<void*>(InterlockedCompareExchange( reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(exch), reinterpret_cast<LONG>(cmp)));#else return InterlockedCompareExchangePointer(dest, exch, cmp);#endif } // Helper function to emulate InterlockedExchangePointer functionality for: // - very old Platform SDKs; and // - platform SDKs where MSVC's /Wp64 option causes spurious warnings. void* interlocked_exchange_pointer(void** dest, void* val) {#if defined(_M_IX86) return reinterpret_cast<void*>(InterlockedExchange( reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(val)));#else return InterlockedExchangePointer(dest, val);#endif } // The IOCP service used for running asynchronous operations and dispatching // handlers. win_iocp_io_service& iocp_service_; // The reactor used for performing connect operations. This object is created // only if needed. reactor_type* rea
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -