⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 一个开源的网络开发库ACE
💻 CPP
📖 第 1 页 / 共 2 页
字号:
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) scheduling reinitiation of Connection_Handler %d\n",
                  connection_handler->connection_id ()));

      // Reschedule ourselves to try and connect again.
      if (ACE_Reactor::instance ()->schedule_timer
          (connection_handler,
           0,
           connection_handler->timeout ()) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "schedule_timer"),
                          -1);
    }
  return 0;
}

// It is useful to provide a separate method to cancel the
// asynchronous connecting.

int
Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler)
{
  // Skip over proxies with deactivated handles.
  if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
    {
      // Make sure to close down peer to reclaim descriptor.
      connection_handler->peer ().close ();
      // Cancel asynchronous connecting before re-initializing.
      return this->connector_.cancel(connection_handler);
    }
  return 0;
}

// Initiate active connections with the Consumer and Supplier Peers.

void
Event_Channel::initiate_connector (void)
{
  if (Options::instance ()->enabled
      (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
    {
      CONNECTION_MAP_ITERATOR cmi (this->connection_map_);

      // Iterate through the Consumer Map connecting all the
      // Connection_Handlers.

      for (CONNECTION_MAP_ENTRY *me = 0;
           cmi.next (me) != 0;
           cmi.advance ())
        {
          Connection_Handler *connection_handler = me->int_id_;

          if (this->initiate_connection_connection (connection_handler) == -1)
            continue; // Failures are handled elsewhere...
        }
    }
}

// Initiate passive acceptor to wait for Consumer and Supplier Peers
// to accept.

int
Event_Channel::initiate_acceptors (void)
{
  if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR))
    {
    
      if (this->consumer_acceptor_.open
          (Options::instance ()->consumer_acceptor_port (),
           ACE_Reactor::instance (),
           Options::instance ()->blocking_semantics ()) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "%p\n",
                           "cannot register acceptor"),
                          -1);
      else
        ACE_DEBUG ((LM_DEBUG,
                    "accepting Consumers at %d\n",
                    Options::instance ()->consumer_acceptor_port ()));
    }
  if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR))
    {
    if(this->supplier_acceptor_.open
          (Options::instance ()->supplier_acceptor_port (),
           ACE_Reactor::instance (),
           Options::instance ()->blocking_semantics ()) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "%p\n",
                           "cannot register acceptor"),
                          -1);
      else
        ACE_DEBUG ((LM_DEBUG,
                    "accepting Suppliers at %d\n",
                    Options::instance ()->supplier_acceptor_port ()));
    }

  return 0;
}

// This method gracefully shuts down all the Handlers in the
// Connection_Handler Connection Map.

int
Event_Channel::close (u_long)
{
  if (Options::instance ()->threading_strategy () != Options::REACTIVE)
    {
      if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "suspend_all"),
                          -1);
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) suspending all threads\n"));
    }

  // First tell everyone that the spaceship is here...
  {
    CONNECTION_MAP_ITERATOR cmi (this->connection_map_);

    // Iterate over all the handlers and shut them down.

    for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
         cmi.next (me) != 0;
         cmi.advance ())
      {
        Connection_Handler *connection_handler = me->int_id_;

        ACE_DEBUG ((LM_DEBUG,
                    "(%t) closing down connection %d\n",
                    connection_handler->connection_id ()));

        // If have no this statement, the gatewayd will abort when exiting
        // with some Consumer/Supplier not connected.
        if (connection_handler->state()==Connection_Handler::CONNECTING)
          this->cancel_connection_connection(connection_handler);
        // Mark Connection_Handler as DISCONNECTING so we don't try to
        // reconnect...
        connection_handler->state (Connection_Handler::DISCONNECTING);
      }
  }

  // Close down the connector
  this->connector_.close ();

  // Close down the supplier acceptor.
  this->supplier_acceptor_.close ();

  // Close down the consumer acceptor.
  this->consumer_acceptor_.close ();

  // Now tell everyone that it is now time to commit suicide.
  {
    CONNECTION_MAP_ITERATOR cmi (this->connection_map_);

    for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
         cmi.next (me) != 0;
         cmi.advance ())
      {
        Connection_Handler *connection_handler = me->int_id_;

        // Deallocate Connection_Handler resources.
        connection_handler->destroy (); // Will trigger a delete.
      }
  }

  return 0;
}

int
Event_Channel::find_proxy (ACE_INT32 connection_id,
                               Connection_Handler *&connection_handler)
{
  return this->connection_map_.find (connection_id,
                                     connection_handler);
}

int
Event_Channel::bind_proxy (Connection_Handler *connection_handler)
{
  int result = this->connection_map_.bind (connection_handler->connection_id (),
                                           connection_handler);

  switch (result)
    {
    case -1:
      ACE_ERROR_RETURN ((LM_ERROR,
                         "(%t) bind failed for connection %d\n",
                         connection_handler->connection_id ()),
                        -1);
      /* NOTREACHED */
    case 1: // Oops, found a duplicate!
      ACE_ERROR_RETURN ((LM_ERROR,
                         "(%t) duplicate connection %d, already bound\n",
                         connection_handler->connection_id ()),
                        -1);
      /* NOTREACHED */
    case 0:
      // Success.
      return 0;
      /* NOTREACHED */
    default:
      ACE_ERROR_RETURN ((LM_DEBUG,
                         "(%t) invalid result %d\n",
                         result),
                        -1);
      /* NOTREACHED */
    }

  ACE_NOTREACHED (return 0);
}

int
Event_Channel::subscribe (const Event_Key &event_addr,
                          Consumer_Dispatch_Set *cds)
{
  int result = this->efd_.bind (event_addr, cds);

  // Bind with consumer map, keyed by peer address.
  switch (result)
    {
    case -1:
      ACE_ERROR_RETURN ((LM_ERROR,
                         "(%t) bind failed for connection %d\n",
                         event_addr.connection_id_),
                        -1);
      /* NOTREACHED */
    case 1: // Oops, found a duplicate!
      ACE_ERROR_RETURN ((LM_DEBUG,
                         "(%t) duplicate consumer map entry %d, "
                         "already bound\n",
                         event_addr.connection_id_),
                        -1);
      /* NOTREACHED */
    case 0:
      // Success.
      return 0;
    default:
      ACE_ERROR_RETURN ((LM_DEBUG,
                         "(%t) invalid result %d\n",
                         result),
                        -1);
      /* NOTREACHED */
    }

  ACE_NOTREACHED (return 0);
}

int
Event_Channel::open (void *)
{
  // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it.
  ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
  ACE_UNUSED_ARG (sig);

  // Actively initiate Peer connections.
  this->initiate_connector ();

  // Passively initiate Peer acceptor.
  if (this->initiate_acceptors () == -1)
    return -1;

  // If we're not running reactively, then we need to make sure that
  // <ACE_Message_Block> reference counting operations are
  // thread-safe.  Therefore, we create an <ACE_Lock_Adapter> that is
  // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions.
  if (Options::instance ()->threading_strategy ()
      != Options::REACTIVE)
    {
      ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la;

      ACE_NEW_RETURN (la,
                      ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,
                      -1);

      Options::instance ()->locking_strategy (la);
    }

  return 0;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;
template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>;
template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Unbounded_Set_Iterator<Connection_Handler *>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>
#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *>
#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -