📄 reactive_socket_service.hpp
字号:
sender_endpoint_.resize(addr_len); bytes_transferred = (bytes < 0 ? 0 : bytes); return true; } void complete(const boost::system::error_code& ec, std::size_t bytes_transferred) { io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); } private: socket_type socket_; int protocol_type_; boost::asio::io_service& io_service_; boost::asio::io_service::work work_; MutableBufferSequence buffers_; endpoint_type& sender_endpoint_; socket_base::message_flags flags_; }; // Start an asynchronous receive. The buffer for the data being received and // the sender_endpoint object must both be valid for the lifetime of the // asynchronous operation. template <typename MutableBufferSequence, typename Handler> void async_receive_from(implementation_type& impl, const MutableBufferSequence& buffers, endpoint_type& sender_endpoint, socket_base::message_flags flags, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, boost::asio::error::bad_descriptor, 0)); } else { // Make socket non-blocking. if (!(impl.flags_ & implementation_type::internal_non_blocking)) { if (!(impl.flags_ & implementation_type::non_blocking)) { ioctl_arg_type non_blocking = 1; boost::system::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec, 0)); return; } } impl.flags_ |= implementation_type::internal_non_blocking; } reactor_.start_read_op(impl.socket_, impl.reactor_data_, receive_from_operation<MutableBufferSequence, Handler>( impl.socket_, impl.protocol_.type(), this->get_io_service(), buffers, sender_endpoint, flags, handler)); } } // Wait until data can be received without blocking. template <typename Handler> void async_receive_from(implementation_type& impl, const null_buffers&, endpoint_type& sender_endpoint, socket_base::message_flags flags, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, boost::asio::error::bad_descriptor, 0)); } else { // Reset endpoint since it can be given no sensible value at this time. sender_endpoint = endpoint_type(); if (flags & socket_base::message_out_of_band) { reactor_.start_except_op(impl.socket_, impl.reactor_data_, null_buffers_operation<Handler>(this->get_io_service(), handler)); } else { reactor_.start_read_op(impl.socket_, impl.reactor_data_, null_buffers_operation<Handler>(this->get_io_service(), handler), false); } } } // Accept a new connection. template <typename Socket> boost::system::error_code accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, boost::system::error_code& ec) { if (!is_open(impl)) { ec = boost::asio::error::bad_descriptor; return ec; } // We cannot accept a socket that is already open. if (peer.is_open()) { ec = boost::asio::error::already_open; return ec; } // Accept a socket. for (;;) { // Try to complete the operation without blocking. boost::system::error_code ec; socket_holder new_socket; std::size_t addr_len = 0; if (peer_endpoint) { addr_len = peer_endpoint->capacity(); new_socket.reset(socket_ops::accept(impl.socket_, peer_endpoint->data(), &addr_len, ec)); } else { new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec)); } // Check if operation succeeded. if (new_socket.get() >= 0) { if (peer_endpoint) peer_endpoint->resize(addr_len); peer.assign(impl.protocol_, new_socket.get(), ec); if (!ec) new_socket.release(); return ec; } // Operation failed. if (ec == boost::asio::error::would_block || ec == boost::asio::error::try_again) { if (impl.flags_ & implementation_type::user_set_non_blocking) return ec; // Fall through to retry operation. } else if (ec == boost::asio::error::connection_aborted) { if (impl.flags_ & implementation_type::enable_connection_aborted) return ec; // Fall through to retry operation. }#if defined(EPROTO) else if (ec.value() == EPROTO) { if (impl.flags_ & implementation_type::enable_connection_aborted) return ec; // Fall through to retry operation. }#endif // defined(EPROTO) else return ec; // Wait for socket to become ready. if (socket_ops::poll_read(impl.socket_, ec) < 0) return ec; } } template <typename Socket, typename Handler> class accept_operation : public handler_base_from_member<Handler> { public: accept_operation(socket_type socket, boost::asio::io_service& io_service, Socket& peer, const protocol_type& protocol, endpoint_type* peer_endpoint, bool enable_connection_aborted, Handler handler) : handler_base_from_member<Handler>(handler), socket_(socket), io_service_(io_service), work_(io_service), peer_(peer), protocol_(protocol), peer_endpoint_(peer_endpoint), enable_connection_aborted_(enable_connection_aborted) { } bool perform(boost::system::error_code& ec, std::size_t&) { // Check whether the operation was successful. if (ec) return true; // Accept the waiting connection. socket_holder new_socket; std::size_t addr_len = 0; if (peer_endpoint_) { addr_len = peer_endpoint_->capacity(); new_socket.reset(socket_ops::accept(socket_, peer_endpoint_->data(), &addr_len, ec)); } else { new_socket.reset(socket_ops::accept(socket_, 0, 0, ec)); } // Check if we need to run the operation again. if (ec == boost::asio::error::would_block || ec == boost::asio::error::try_again) return false; if (ec == boost::asio::error::connection_aborted && !enable_connection_aborted_) return false;#if defined(EPROTO) if (ec.value() == EPROTO && !enable_connection_aborted_) return false;#endif // defined(EPROTO) // Transfer ownership of the new socket to the peer object. if (!ec) { if (peer_endpoint_) peer_endpoint_->resize(addr_len); peer_.assign(protocol_, new_socket.get(), ec); if (!ec) new_socket.release(); } return true; } void complete(const boost::system::error_code& ec, std::size_t) { io_service_.post(bind_handler(this->handler_, ec)); } private: socket_type socket_; boost::asio::io_service& io_service_; boost::asio::io_service::work work_; Socket& peer_; protocol_type protocol_; endpoint_type* peer_endpoint_; bool enable_connection_aborted_; }; // Start an asynchronous accept. The peer and peer_endpoint objects // must be valid until the accept's handler is invoked. template <typename Socket, typename Handler> void async_accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, boost::asio::error::bad_descriptor)); } else if (peer.is_open()) { this->get_io_service().post(bind_handler(handler, boost::asio::error::already_open)); } else { // Make socket non-blocking. if (!(impl.flags_ & implementation_type::internal_non_blocking)) { if (!(impl.flags_ & implementation_type::non_blocking)) { ioctl_arg_type non_blocking = 1; boost::system::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec)); return; } } impl.flags_ |= implementation_type::internal_non_blocking; } reactor_.start_read_op(impl.socket_, impl.reactor_data_, accept_operation<Socket, Handler>( impl.socket_, this->get_io_service(), peer, impl.protocol_, peer_endpoint, (impl.flags_ & implementation_type::enable_connection_aborted) != 0, handler)); } } // Connect the socket to the specified endpoint. boost::system::error_code connect(implementation_type& impl, const endpoint_type& peer_endpoint, boost::system::error_code& ec) { if (!is_open(impl)) { ec = boost::asio::error::bad_descriptor; return ec; } // Perform the connect operation. socket_ops::connect(impl.socket_, peer_endpoint.data(), peer_endpoint.size(), ec); if (ec != boost::asio::error::in_progress && ec != boost::asio::error::would_block) { // The connect operation finished immediately. return ec; } // Wait for socket to become ready. if (socket_ops::poll_connect(impl.socket_, ec) < 0) return ec; // Get the error code from the connect operation. int connect_error = 0; size_t connect_error_len = sizeof(connect_error); if (socket_ops::getsockopt(impl.socket_, SOL_SOCKET, SO_ERROR, &connect_error, &connect_error_len, ec) == socket_error_retval) return ec; // Return the result of the connect operation. ec = boost::system::error_code(connect_error, boost::asio::error::get_system_category()); return ec; } template <typename Handler> class connect_operation : public handler_base_from_member<Handler> { public: connect_operation(socket_type socket, boost::asio::io_service& io_service, Handler handler) : handler_base_from_member<Handler>(handler), socket_(socket), io_service_(io_service), work_(io_service) { } bool perform(boost::system::error_code& ec, std::size_t&) { // Check whether the operation was successful. if (ec) return true; // Get the error code from the connect operation. int connect_error = 0; size_t connect_error_len = sizeof(connect_error); if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &connect_error, &connect_error_len, ec) == socket_error_retval) return true; // The connection failed so the handler will be posted with an error code. if (connect_error) { ec = boost::system::error_code(connect_error, boost::asio::error::get_system_category()); return true; } return true; } void complete(const boost::system::error_code& ec, std::size_t) { io_service_.post(bind_handler(this->handler_, ec)); } private: socket_type socket_; boost::asio::io_service& io_service_; boost::asio::io_service::work work_; }; // Start an asynchronous connect. template <typename Handler> void async_connect(implementation_type& impl, const endpoint_type& peer_endpoint, Handler handler) { if (!is_open(impl)) { this->get_io_service().post(bind_handler(handler, boost::asio::error::bad_descriptor)); return; } // Make socket non-blocking. if (!(impl.flags_ & implementation_type::internal_non_blocking)) { if (!(impl.flags_ & implementation_type::non_blocking)) { ioctl_arg_type non_blocking = 1; boost::system::error_code ec; if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) { this->get_io_service().post(bind_handler(handler, ec)); return; } } impl.flags_ |= implementation_type::internal_non_blocking; } // Start the connect operation. The socket is already marked as non-blocking // so the connection will take place asynchronously. boost::system::error_code ec; if (socket_ops::connect(impl.socket_, peer_endpoint.data(), peer_endpoint.size(), ec) == 0) { // The connect operation has finished successfully so we need to post the // handler immediately. this->get_io_service().post(bind_handler(handler, boost::system::error_code())); } else if (ec == boost::asio::error::in_progress || ec == boost::asio::error::would_block) { // The connection is happening in the background, and we need to wait // until the socket becomes writeable. reactor_.start_connect_op(impl.socket_, impl.reactor_data_, connect_operation<Handler>(impl.socket_, this->get_io_service(), handler)); } else { // The connect operation has failed, so post the handler immediately. this->get_io_service().post(bind_handler(handler, ec)); } }private: // The selector that performs event demultiplexing for the service. Reactor& reactor_;};} // namespace detail} // namespace asio} // namespace boost#include <boost/asio/detail/pop_options.hpp>#endif // BOOST_ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -