📄 reactive_socket_service.hpp
字号:
this->get_io_service().post(bind_handler(handler, asio::error::bad_descriptor, 0)); } else { // Make socket non-blocking. if (!(impl.flags_ & implementation_type::internal_non_blocking)) { ioctl_arg_type non_blocking = 1; asio::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec, 0)); return; } impl.flags_ |= implementation_type::internal_non_blocking; } reactor_.start_read_op(impl.socket_, receive_from_handler<MutableBufferSequence, Handler>( impl.socket_, this->get_io_service(), buffers, sender_endpoint, flags, handler)); } } // Accept a new connection. template <typename Socket> asio::error_code accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec) { if (!is_open(impl)) { ec = asio::error::bad_descriptor; return ec; } // We cannot accept a socket that is already open. if (peer.is_open()) { ec = asio::error::already_open; return ec; } // Make socket non-blocking if user wants non-blocking. if (impl.flags_ & implementation_type::user_set_non_blocking) { if (!(impl.flags_ & implementation_type::internal_non_blocking)) { ioctl_arg_type non_blocking = 1; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) return ec; impl.flags_ |= implementation_type::internal_non_blocking; } } // Accept a socket. for (;;) { // Try to complete the operation without blocking. asio::error_code ec; socket_holder new_socket; std::size_t addr_len = 0; if (peer_endpoint) { addr_len = peer_endpoint->capacity(); new_socket.reset(socket_ops::accept(impl.socket_, peer_endpoint->data(), &addr_len, ec)); } else { new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec)); } // Check if operation succeeded. if (new_socket.get() >= 0) { if (peer_endpoint) peer_endpoint->resize(addr_len); peer.assign(impl.protocol_, new_socket.get(), ec); if (!ec) new_socket.release(); return ec; } // Operation failed. if (ec == asio::error::would_block || ec == asio::error::try_again) { if (impl.flags_ & implementation_type::user_set_non_blocking) return ec; // Fall through to retry operation. } else if (ec == asio::error::connection_aborted) { if (impl.flags_ & implementation_type::enable_connection_aborted) return ec; // Fall through to retry operation. }#if defined(EPROTO) else if (ec.value() == EPROTO) { if (impl.flags_ & implementation_type::enable_connection_aborted) return ec; // Fall through to retry operation. }#endif // defined(EPROTO) else return ec; // Wait for socket to become ready. if (socket_ops::poll_read(impl.socket_, ec) < 0) return ec; } } template <typename Socket, typename Handler> class accept_handler { public: accept_handler(socket_type socket, asio::io_service& io_service, Socket& peer, const protocol_type& protocol, endpoint_type* peer_endpoint, bool enable_connection_aborted, Handler handler) : socket_(socket), io_service_(io_service), work_(io_service), peer_(peer), protocol_(protocol), peer_endpoint_(peer_endpoint), enable_connection_aborted_(enable_connection_aborted), handler_(handler) { } bool operator()(const asio::error_code& result) { // Check whether the operation was successful. if (result) { io_service_.post(bind_handler(handler_, result)); return true; } // Accept the waiting connection. asio::error_code ec; socket_holder new_socket; std::size_t addr_len = 0; if (peer_endpoint_) { addr_len = peer_endpoint_->capacity(); new_socket.reset(socket_ops::accept(socket_, peer_endpoint_->data(), &addr_len, ec)); } else { new_socket.reset(socket_ops::accept(socket_, 0, 0, ec)); } // Check if we need to run the operation again. if (ec == asio::error::would_block || ec == asio::error::try_again) return false; if (ec == asio::error::connection_aborted && !enable_connection_aborted_) return false;#if defined(EPROTO) if (ec.value() == EPROTO && !enable_connection_aborted_) return false;#endif // defined(EPROTO) // Transfer ownership of the new socket to the peer object. if (!ec) { if (peer_endpoint_) peer_endpoint_->resize(addr_len); peer_.assign(protocol_, new_socket.get(), ec); if (!ec) new_socket.release(); } io_service_.post(bind_handler(handler_, ec)); return true; } private: socket_type socket_; asio::io_service& io_service_; asio::io_service::work work_; Socket& peer_; protocol_type protocol_; endpoint_type* peer_endpoint_; bool enable_connection_aborted_; Handler handler_; }; // Start an asynchronous accept. The peer and peer_endpoint objects // must be valid until the accept's handler is invoked. template <typename Socket, typename Handler> void async_accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, asio::error::bad_descriptor)); } else if (peer.is_open()) { this->get_io_service().post(bind_handler(handler, asio::error::already_open)); } else { // Make socket non-blocking. if (!(impl.flags_ & implementation_type::internal_non_blocking)) { ioctl_arg_type non_blocking = 1; asio::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec)); return; } impl.flags_ |= implementation_type::internal_non_blocking; } reactor_.start_read_op(impl.socket_, accept_handler<Socket, Handler>( impl.socket_, this->get_io_service(), peer, impl.protocol_, peer_endpoint, (impl.flags_ & implementation_type::enable_connection_aborted) != 0, handler)); } } // Connect the socket to the specified endpoint. asio::error_code connect(implementation_type& impl, const endpoint_type& peer_endpoint, asio::error_code& ec) { if (!is_open(impl)) { ec = asio::error::bad_descriptor; return ec; } if (impl.flags_ & implementation_type::internal_non_blocking) { // Mark the socket as blocking while we perform the connect. ioctl_arg_type non_blocking = 0; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) return ec; impl.flags_ &= ~implementation_type::internal_non_blocking; } // Perform the connect operation. socket_ops::connect(impl.socket_, peer_endpoint.data(), peer_endpoint.size(), ec); return ec; } template <typename Handler> class connect_handler { public: connect_handler(socket_type socket, boost::shared_ptr<bool> completed, asio::io_service& io_service, Reactor& reactor, Handler handler) : socket_(socket), completed_(completed), io_service_(io_service), work_(io_service), reactor_(reactor), handler_(handler) { } bool operator()(const asio::error_code& result) { // Check whether a handler has already been called for the connection. // If it has, then we don't want to do anything in this handler. if (*completed_) return true; // Cancel the other reactor operation for the connection. *completed_ = true; reactor_.enqueue_cancel_ops_unlocked(socket_); // Check whether the operation was successful. if (result) { io_service_.post(bind_handler(handler_, result)); return true; } // Get the error code from the connect operation. int connect_error = 0; size_t connect_error_len = sizeof(connect_error); asio::error_code ec; if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &connect_error, &connect_error_len, ec) == socket_error_retval) { io_service_.post(bind_handler(handler_, ec)); return true; } // If connection failed then post the handler with the error code. if (connect_error) { ec = asio::error_code(connect_error, asio::error::get_system_category()); io_service_.post(bind_handler(handler_, ec)); return true; } // Post the result of the successful connection operation. io_service_.post(bind_handler(handler_, ec)); return true; } private: socket_type socket_; boost::shared_ptr<bool> completed_; asio::io_service& io_service_; asio::io_service::work work_; Reactor& reactor_; Handler handler_; }; // Start an asynchronous connect. template <typename Handler> void async_connect(implementation_type& impl, const endpoint_type& peer_endpoint, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, asio::error::bad_descriptor)); return; } // Make socket non-blocking. if (!(impl.flags_ & implementation_type::internal_non_blocking)) { ioctl_arg_type non_blocking = 1; asio::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec)); return; } impl.flags_ |= implementation_type::internal_non_blocking; } // Start the connect operation. The socket is already marked as non-blocking // so the connection will take place asynchronously. asio::error_code ec; if (socket_ops::connect(impl.socket_, peer_endpoint.data(), peer_endpoint.size(), ec) == 0) { // The connect operation has finished successfully so we need to post the // handler immediately. this->get_io_service().post(bind_handler(handler, asio::error_code())); } else if (ec == asio::error::in_progress || ec == asio::error::would_block) { // The connection is happening in the background, and we need to wait // until the socket becomes writeable. boost::shared_ptr<bool> completed(new bool(false)); reactor_.start_write_and_except_ops(impl.socket_, connect_handler<Handler>(impl.socket_, completed, this->get_io_service(), reactor_, handler)); } else { // The connect operation has failed, so post the handler immediately. this->get_io_service().post(bind_handler(handler, ec)); } }private: // The selector that performs event demultiplexing for the service. Reactor& reactor_;};} // namespace detail} // namespace asio#include "asio/detail/pop_options.hpp"#endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -