⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 win_iocp_socket_service.hpp

📁 这是国外的resip协议栈
💻 HPP
📖 第 1 页 / 共 5 页
字号:
      // Need to set the SO_UPDATE_ACCEPT_CONTEXT option so that getsockname      // and getpeername will work on the accepted socket.      if (last_error == 0)      {        SOCKET update_ctx_param = handler_op->socket_;        asio::error_code ec;        if (socket_ops::setsockopt(handler_op->new_socket_.get(),              SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,              &update_ctx_param, sizeof(SOCKET), ec) != 0)        {          last_error = ec.value();        }      }      // If the socket was successfully accepted, transfer ownership of the      // socket to the peer object.      if (last_error == 0)      {        asio::error_code ec;        handler_op->peer_.assign(handler_op->protocol_,            native_type(handler_op->new_socket_.get(), peer_endpoint), ec);        if (ec)          last_error = ec.value();        else          handler_op->new_socket_.release();      }      // Pass endpoint back to caller.      if (handler_op->peer_endpoint_)        *handler_op->peer_endpoint_ = peer_endpoint;      // Make a copy of the handler so that the memory can be deallocated before      // the upcall is made.      Handler handler(handler_op->handler_);      // Free the memory associated with the handler.      ptr.reset();      // Call the handler.      asio::error_code ec(last_error,          asio::error::get_system_category());      asio_handler_invoke_helpers::invoke(          detail::bind_handler(handler, ec), &handler);    }    static void destroy_impl(operation* op)    {      // Take ownership of the operation object.      typedef accept_operation<Socket, Handler> op_type;      op_type* handler_op(static_cast<op_type*>(op));      typedef handler_alloc_traits<Handler, op_type> alloc_traits;      handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);    }    win_iocp_io_service& io_service_;    socket_type socket_;    socket_holder new_socket_;    Socket& peer_;    protocol_type protocol_;    endpoint_type* peer_endpoint_;    asio::io_service::work work_;    unsigned char output_buffer_[(sizeof(sockaddr_storage_type) + 16) * 2];    bool enable_connection_aborted_;    Handler handler_;  };  // Start an asynchronous accept. The peer and peer_endpoint objects  // must be valid until the accept's handler is invoked.  template <typename Socket, typename Handler>  void async_accept(implementation_type& impl, Socket& peer,      endpoint_type* peer_endpoint, Handler handler)  {    // Check whether acceptor has been initialised.    if (!is_open(impl))    {      this->get_io_service().post(bind_handler(handler,            asio::error::bad_descriptor));      return;    }    // Check that peer socket has not already been opened.    if (peer.is_open())    {      this->get_io_service().post(bind_handler(handler,            asio::error::already_open));      return;    }    // Update the ID of the thread from which cancellation is safe.    if (impl.safe_cancellation_thread_id_ == 0)      impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();    else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())      impl.safe_cancellation_thread_id_ = ~DWORD(0);    // Create a new socket for the connection.    asio::error_code ec;    socket_holder sock(socket_ops::socket(impl.protocol_.family(),          impl.protocol_.type(), impl.protocol_.protocol(), ec));    if (sock.get() == invalid_socket)    {      this->get_io_service().post(bind_handler(handler, ec));      return;    }    // Allocate and construct an operation to wrap the handler.    typedef accept_operation<Socket, Handler> value_type;    typedef handler_alloc_traits<Handler, value_type> alloc_traits;    raw_handler_ptr<alloc_traits> raw_ptr(handler);    socket_type new_socket = sock.get();    bool enable_connection_aborted =      (impl.flags_ & implementation_type::enable_connection_aborted);    handler_ptr<alloc_traits> ptr(raw_ptr,        iocp_service_, impl.socket_, new_socket, peer, impl.protocol_,        peer_endpoint, enable_connection_aborted, handler);    sock.release();    // Accept a connection.    DWORD bytes_read = 0;    BOOL result = ::AcceptEx(impl.socket_, ptr.get()->new_socket(),        ptr.get()->output_buffer(), 0, ptr.get()->address_length(),        ptr.get()->address_length(), &bytes_read, ptr.get());    DWORD last_error = ::WSAGetLastError();    // Check if the operation completed immediately.    if (!result && last_error != WSA_IO_PENDING)    {      if (!enable_connection_aborted          && (last_error == ERROR_NETNAME_DELETED            || last_error == WSAECONNABORTED))      {        // Post handler so that operation will be restarted again. We do not        // perform the AcceptEx again here to avoid the possibility of starving        // other handlers.        iocp_service_.post_completion(ptr.get(), last_error, 0);        ptr.release();      }      else      {        asio::io_service::work work(this->get_io_service());        ptr.reset();        asio::error_code ec(last_error,            asio::error::get_system_category());        iocp_service_.post(bind_handler(handler, ec));      }    }    else    {      ptr.release();    }  }  // Connect the socket to the specified endpoint.  asio::error_code connect(implementation_type& impl,      const endpoint_type& peer_endpoint, asio::error_code& ec)  {    if (!is_open(impl))    {      ec = asio::error::bad_descriptor;      return ec;    }    // Perform the connect operation.    socket_ops::connect(impl.socket_,        peer_endpoint.data(), peer_endpoint.size(), ec);    return ec;  }  template <typename Handler>  class connect_handler  {  public:    connect_handler(socket_type socket, bool user_set_non_blocking,        boost::shared_ptr<bool> completed,        asio::io_service& io_service,        reactor_type& reactor, Handler handler)      : socket_(socket),        user_set_non_blocking_(user_set_non_blocking),        completed_(completed),        io_service_(io_service),        reactor_(reactor),        work_(io_service),        handler_(handler)    {    }    bool operator()(const asio::error_code& result)    {      // Check whether a handler has already been called for the connection.      // If it has, then we don't want to do anything in this handler.      if (*completed_)        return true;      // Cancel the other reactor operation for the connection.      *completed_ = true;      reactor_.enqueue_cancel_ops_unlocked(socket_);      // Check whether the operation was successful.      if (result)      {        io_service_.post(bind_handler(handler_, result));        return true;      }      // Get the error code from the connect operation.      int connect_error = 0;      size_t connect_error_len = sizeof(connect_error);      asio::error_code ec;      if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,            &connect_error, &connect_error_len, ec) == socket_error_retval)      {        io_service_.post(bind_handler(handler_, ec));        return true;      }      // If connection failed then post the handler with the error code.      if (connect_error)      {        ec = asio::error_code(connect_error,            asio::error::get_system_category());        io_service_.post(bind_handler(handler_, ec));        return true;      }      // Revert socket to blocking mode unless the user requested otherwise.      if (!user_set_non_blocking_)      {        ioctl_arg_type non_blocking = 0;        if (socket_ops::ioctl(socket_, FIONBIO, &non_blocking, ec))        {          io_service_.post(bind_handler(handler_, ec));          return true;        }      }      // Post the result of the successful connection operation.      ec = asio::error_code();      io_service_.post(bind_handler(handler_, ec));      return true;    }  private:    socket_type socket_;    bool user_set_non_blocking_;    boost::shared_ptr<bool> completed_;    asio::io_service& io_service_;    reactor_type& reactor_;    asio::io_service::work work_;    Handler handler_;  };  // Start an asynchronous connect.  template <typename Handler>  void async_connect(implementation_type& impl,      const endpoint_type& peer_endpoint, Handler handler)  {    if (!is_open(impl))    {      this->get_io_service().post(bind_handler(handler,            asio::error::bad_descriptor));      return;    }    // Update the ID of the thread from which cancellation is safe.    if (impl.safe_cancellation_thread_id_ == 0)      impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();    else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())      impl.safe_cancellation_thread_id_ = ~DWORD(0);    // Check if the reactor was already obtained from the io_service.    reactor_type* reactor = static_cast<reactor_type*>(          interlocked_compare_exchange_pointer(            reinterpret_cast<void**>(&reactor_), 0, 0));    if (!reactor)    {      reactor = &(asio::use_service<reactor_type>(            this->get_io_service()));      interlocked_exchange_pointer(          reinterpret_cast<void**>(&reactor_), reactor);    }    // Mark the socket as non-blocking so that the connection will take place    // asynchronously.    ioctl_arg_type non_blocking = 1;    asio::error_code ec;    if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))    {      this->get_io_service().post(bind_handler(handler, ec));      return;    }    // Start the connect operation.    if (socket_ops::connect(impl.socket_, peer_endpoint.data(),          peer_endpoint.size(), ec) == 0)    {      // Revert socket to blocking mode unless the user requested otherwise.      if (!(impl.flags_ & implementation_type::user_set_non_blocking))      {        non_blocking = 0;        socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);      }      // The connect operation has finished successfully so we need to post the      // handler immediately.      this->get_io_service().post(bind_handler(handler, ec));    }    else if (ec == asio::error::in_progress        || ec == asio::error::would_block)    {      // The connection is happening in the background, and we need to wait      // until the socket becomes writeable.      boost::shared_ptr<bool> completed(new bool(false));      reactor->start_write_and_except_ops(impl.socket_,          connect_handler<Handler>(            impl.socket_,            (impl.flags_ & implementation_type::user_set_non_blocking) != 0,            completed, this->get_io_service(), *reactor, handler));    }    else    {      // Revert socket to blocking mode unless the user requested otherwise.      if (!(impl.flags_ & implementation_type::user_set_non_blocking))      {        non_blocking = 0;        asio::error_code ignored_ec;        socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);      }      // The connect operation has failed, so post the handler immediately.      this->get_io_service().post(bind_handler(handler, ec));    }  }private:  // Helper function to close a socket when the associated object is being  // destroyed.  void close_for_destruction(implementation_type& impl)  {    if (is_open(impl))    {      // Check if the reactor was created, in which case we need to close the      // socket on the reactor as well to cancel any operations that might be      // running there.      reactor_type* reactor = static_cast<reactor_type*>(            interlocked_compare_exchange_pointer(              reinterpret_cast<void**>(&reactor_), 0, 0));      if (reactor)        reactor->close_descriptor(impl.socket_);      // The socket destructor must not block. If the user has changed the      // linger option to block in the foreground, we will change it back to the      // default so that the closure is performed in the background.      if (impl.flags_ & implementation_type::close_might_block)      {        ::linger opt;        opt.l_onoff = 0;        opt.l_linger = 0;        asio::error_code ignored_ec;        socket_ops::setsockopt(impl.socket_,            SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);      }      asio::error_code ignored_ec;      socket_ops::close(impl.socket_, ignored_ec);      impl.socket_ = invalid_socket;      impl.flags_ = 0;      impl.cancel_token_.reset();      impl.safe_cancellation_thread_id_ = 0;    }  }  // Helper function to emulate InterlockedCompareExchangePointer functionality  // for:  // - very old Platform SDKs; and  // - platform SDKs where MSVC's /Wp64 option causes spurious warnings.  void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp)  {#if defined(_M_IX86)    return reinterpret_cast<void*>(InterlockedCompareExchange(          reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(exch),          reinterpret_cast<LONG>(cmp)));#else    return InterlockedCompareExchangePointer(dest, exch, cmp);#endif  }  // Helper function to emulate InterlockedExchangePointer functionality for:  // - very old Platform SDKs; and  // - platform SDKs where MSVC's /Wp64 option causes spurious warnings.  void* interlocked_exchange_pointer(void** dest, void* val)  {#if defined(_M_IX86)    return reinterpret_cast<void*>(InterlockedExchange(          reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(val)));#else    return InterlockedExchangePointer(dest, val);#endif  }  // The IOCP service used for running asynchronous operations and dispatching  // handlers.  win_iocp_io_service& iocp_service_;  // The reactor used for performing connect operations. This object is created  // only if needed.  reactor_type* rea

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -