📄 win_iocp_socket_service.hpp
字号:
asio::error_code ec(last_error, asio::error::get_system_category()); iocp_service_.post(bind_handler(handler, ec, bytes_transferred)); } else { ptr.release(); } } // Receive a datagram with the endpoint of the sender. Returns the number of // bytes received. template <typename MutableBufferSequence> size_t receive_from(implementation_type& impl, const MutableBufferSequence& buffers, endpoint_type& sender_endpoint, socket_base::message_flags flags, asio::error_code& ec) { if (!is_open(impl)) { ec = asio::error::bad_descriptor; return 0; } // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; typename MutableBufferSequence::const_iterator iter = buffers.begin(); typename MutableBufferSequence::const_iterator end = buffers.end(); DWORD i = 0; for (; iter != end && i < max_buffers; ++iter, ++i) { asio::mutable_buffer buffer(*iter); bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer)); bufs[i].buf = asio::buffer_cast<char*>(buffer); } // Receive some data. DWORD bytes_transferred = 0; DWORD recv_flags = flags; int endpoint_size = static_cast<int>(sender_endpoint.capacity()); int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred, &recv_flags, sender_endpoint.data(), &endpoint_size, 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); if (last_error == ERROR_PORT_UNREACHABLE) last_error = WSAECONNREFUSED; ec = asio::error_code(last_error, asio::error::get_system_category()); return 0; } if (bytes_transferred == 0) { ec = asio::error::eof; return 0; } sender_endpoint.resize(static_cast<std::size_t>(endpoint_size)); ec = asio::error_code(); return bytes_transferred; } template <typename MutableBufferSequence, typename Handler> class receive_from_operation : public operation { public: receive_from_operation(asio::io_service& io_service, endpoint_type& endpoint, const MutableBufferSequence& buffers, Handler handler) : operation( &receive_from_operation< MutableBufferSequence, Handler>::do_completion_impl, &receive_from_operation< MutableBufferSequence, Handler>::destroy_impl), endpoint_(endpoint), endpoint_size_(static_cast<int>(endpoint.capacity())), work_(io_service), buffers_(buffers), handler_(handler) { } int& endpoint_size() { return endpoint_size_; } private: static void do_completion_impl(operation* op, DWORD last_error, size_t bytes_transferred) { // Take ownership of the operation object. typedef receive_from_operation<MutableBufferSequence, Handler> op_type; op_type* handler_op(static_cast<op_type*>(op)); typedef handler_alloc_traits<Handler, op_type> alloc_traits; handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);#if defined(ASIO_ENABLE_BUFFER_DEBUGGING) // Check whether buffers are still valid. typename MutableBufferSequence::const_iterator iter = handler_op->buffers_.begin(); typename MutableBufferSequence::const_iterator end = handler_op->buffers_.end(); while (iter != end) { asio::mutable_buffer buffer(*iter); asio::buffer_cast<char*>(buffer); ++iter; }#endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING) // Map non-portable errors to their portable counterparts. asio::error_code ec(last_error, asio::error::get_system_category()); if (ec.value() == ERROR_PORT_UNREACHABLE) { ec = asio::error::connection_refused; } // Check for connection closed. if (!ec && bytes_transferred == 0) { ec = asio::error::eof; } // Record the size of the endpoint returned by the operation. handler_op->endpoint_.resize(handler_op->endpoint_size_); // Make a copy of the handler so that the memory can be deallocated before // the upcall is made. Handler handler(handler_op->handler_); // Free the memory associated with the handler. ptr.reset(); // Call the handler. asio_handler_invoke_helpers::invoke( detail::bind_handler(handler, ec, bytes_transferred), &handler); } static void destroy_impl(operation* op) { // Take ownership of the operation object. typedef receive_from_operation<MutableBufferSequence, Handler> op_type; op_type* handler_op(static_cast<op_type*>(op)); typedef handler_alloc_traits<Handler, op_type> alloc_traits; handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); } endpoint_type& endpoint_; int endpoint_size_; asio::io_service::work work_; MutableBufferSequence buffers_; Handler handler_; }; // Start an asynchronous receive. The buffer for the data being received and // the sender_endpoint object must both be valid for the lifetime of the // asynchronous operation. template <typename MutableBufferSequence, typename Handler> void async_receive_from(implementation_type& impl, const MutableBufferSequence& buffers, endpoint_type& sender_endp, socket_base::message_flags flags, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, asio::error::bad_descriptor, 0)); return; } // Update the ID of the thread from which cancellation is safe. if (impl.safe_cancellation_thread_id_ == 0) impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) impl.safe_cancellation_thread_id_ = ~DWORD(0); // Allocate and construct an operation to wrap the handler. typedef receive_from_operation<MutableBufferSequence, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); handler_ptr<alloc_traits> ptr(raw_ptr, this->get_io_service(), sender_endp, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; typename MutableBufferSequence::const_iterator iter = buffers.begin(); typename MutableBufferSequence::const_iterator end = buffers.end(); DWORD i = 0; for (; iter != end && i < max_buffers; ++iter, ++i) { asio::mutable_buffer buffer(*iter); bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer)); bufs[i].buf = asio::buffer_cast<char*>(buffer); } // Receive some data. DWORD bytes_transferred = 0; DWORD recv_flags = flags; int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred, &recv_flags, sender_endp.data(), &ptr.get()->endpoint_size(), ptr.get(), 0); DWORD last_error = ::WSAGetLastError(); if (result != 0 && last_error != WSA_IO_PENDING) { asio::io_service::work work(this->get_io_service()); ptr.reset(); asio::error_code ec(last_error, asio::error::get_system_category()); iocp_service_.post(bind_handler(handler, ec, bytes_transferred)); } else { ptr.release(); } } // Accept a new connection. template <typename Socket> asio::error_code accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec) { if (!is_open(impl)) { ec = asio::error::bad_descriptor; return ec; } // We cannot accept a socket that is already open. if (peer.is_open()) { ec = asio::error::already_open; return ec; } for (;;) { asio::error_code ec; socket_holder new_socket; std::size_t addr_len = 0; if (peer_endpoint) { addr_len = peer_endpoint->capacity(); new_socket.reset(socket_ops::accept(impl.socket_, peer_endpoint->data(), &addr_len, ec)); } else { new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec)); } if (ec) { if (ec == asio::error::connection_aborted && !(impl.flags_ & implementation_type::enable_connection_aborted)) { // Retry accept operation. continue; } else { return ec; } } if (peer_endpoint) peer_endpoint->resize(addr_len); peer.assign(impl.protocol_, new_socket.get(), ec); if (!ec) new_socket.release(); return ec; } } template <typename Socket, typename Handler> class accept_operation : public operation { public: accept_operation(win_iocp_io_service& io_service, socket_type socket, socket_type new_socket, Socket& peer, const protocol_type& protocol, endpoint_type* peer_endpoint, bool enable_connection_aborted, Handler handler) : operation( &accept_operation<Socket, Handler>::do_completion_impl, &accept_operation<Socket, Handler>::destroy_impl), io_service_(io_service), socket_(socket), new_socket_(new_socket), peer_(peer), protocol_(protocol), peer_endpoint_(peer_endpoint), work_(io_service.get_io_service()), enable_connection_aborted_(enable_connection_aborted), handler_(handler) { } socket_type new_socket() { return new_socket_.get(); } void* output_buffer() { return output_buffer_; } DWORD address_length() { return sizeof(sockaddr_storage_type) + 16; } private: static void do_completion_impl(operation* op, DWORD last_error, size_t bytes_transferred) { // Take ownership of the operation object. typedef accept_operation<Socket, Handler> op_type; op_type* handler_op(static_cast<op_type*>(op)); typedef handler_alloc_traits<Handler, op_type> alloc_traits; handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); // Map Windows error ERROR_NETNAME_DELETED to connection_aborted. if (last_error == ERROR_NETNAME_DELETED) { last_error = WSAECONNABORTED; } // Restart the accept operation if we got the connection_aborted error // and the enable_connection_aborted socket option is not set. if (last_error == WSAECONNABORTED && !ptr.get()->enable_connection_aborted_) { // Reset OVERLAPPED structure. ptr.get()->Internal = 0; ptr.get()->InternalHigh = 0; ptr.get()->Offset = 0; ptr.get()->OffsetHigh = 0; ptr.get()->hEvent = 0; // Create a new socket for the next connection, since the AcceptEx call // fails with WSAEINVAL if we try to reuse the same socket. asio::error_code ec; ptr.get()->new_socket_.reset(); ptr.get()->new_socket_.reset(socket_ops::socket( ptr.get()->protocol_.family(), ptr.get()->protocol_.type(), ptr.get()->protocol_.protocol(), ec)); if (ptr.get()->new_socket() != invalid_socket) { // Accept a connection. DWORD bytes_read = 0; BOOL result = ::AcceptEx(ptr.get()->socket_, ptr.get()->new_socket(), ptr.get()->output_buffer(), 0, ptr.get()->address_length(), ptr.get()->address_length(), &bytes_read, ptr.get()); last_error = ::WSAGetLastError(); // Check if the operation completed immediately. if (!result && last_error != WSA_IO_PENDING) { if (last_error == ERROR_NETNAME_DELETED || last_error == WSAECONNABORTED) { // Post this handler so that operation will be restarted again. ptr.get()->io_service_.post_completion(ptr.get(), last_error, 0); ptr.release(); return; } else { // Operation already complete. Continue with rest of this handler. } } else { // Asynchronous operation has been successfully restarted. ptr.release(); return; } } } // Get the address of the peer. endpoint_type peer_endpoint; if (last_error == 0) { LPSOCKADDR local_addr = 0; int local_addr_length = 0; LPSOCKADDR remote_addr = 0; int remote_addr_length = 0; GetAcceptExSockaddrs(handler_op->output_buffer(), 0, handler_op->address_length(), handler_op->address_length(), &local_addr, &local_addr_length, &remote_addr, &remote_addr_length); if (static_cast<std::size_t>(remote_addr_length) > peer_endpoint.capacity()) { last_error = WSAEINVAL; } else { using namespace std; // For memcpy. memcpy(peer_endpoint.data(), remote_addr, remote_addr_length); peer_endpoint.resize(static_cast<std::size_t>(remote_addr_length)); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -