⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
📖 第 1 页 / 共 2 页
字号:
                  "(%t) scheduling reinitiation of Connection_Handler %d\n",                  connection_handler->connection_id ()));      // Reschedule ourselves to try and connect again.      ACE_Time_Value const timeout (connection_handler->timeout ());      if (ACE_Reactor::instance ()->schedule_timer          (connection_handler,           0,           timeout) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "(%t) %p\n",                           "schedule_timer"),                          -1);    }  return 0;}// It is useful to provide a separate method to cancel the// asynchronous connecting.intEvent_Channel::cancel_connection_connection (Connection_Handler *connection_handler){  // Skip over proxies with deactivated handles.  if (connection_handler->get_handle () != ACE_INVALID_HANDLE)    {      // Make sure to close down peer to reclaim descriptor.      connection_handler->peer ().close ();      // Cancel asynchronous connecting before re-initializing.      return this->connector_.cancel(connection_handler);    }  return 0;}// Initiate active connections with the Consumer and Supplier Peers.voidEvent_Channel::initiate_connector (void){  if (Options::instance ()->enabled      (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))    {      CONNECTION_MAP_ITERATOR cmi (this->connection_map_);      // Iterate through the Consumer Map connecting all the      // Connection_Handlers.      for (CONNECTION_MAP_ENTRY *me = 0;           cmi.next (me) != 0;           cmi.advance ())        {          Connection_Handler *connection_handler = me->int_id_;          if (this->initiate_connection_connection (connection_handler) == -1)            continue; // Failures are handled elsewhere...        }    }}// Initiate passive acceptor to wait for Consumer and Supplier Peers// to accept.intEvent_Channel::initiate_acceptors (void){  if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR))    {      ACE_INET_Addr        consumer_addr (Options::instance ()->consumer_acceptor_port ());      if (this->consumer_acceptor_.open          (consumer_addr,           ACE_Reactor::instance (),           Options::instance ()->blocking_semantics ()) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "%p\n",                           "cannot register acceptor"),                          -1);      else        ACE_DEBUG ((LM_DEBUG,                    "accepting Consumers at %d\n",                    Options::instance ()->consumer_acceptor_port ()));    }  if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR))    {      ACE_INET_Addr        supplier_addr (Options::instance ()->supplier_acceptor_port ());      if (this->supplier_acceptor_.open          (supplier_addr,           ACE_Reactor::instance (),           Options::instance ()->blocking_semantics ()) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "%p\n",                           "cannot register acceptor"),                          -1);      else        ACE_DEBUG ((LM_DEBUG,                    "accepting Suppliers at %d\n",                    Options::instance ()->supplier_acceptor_port ()));    }  return 0;}// This method gracefully shuts down all the Handlers in the// Connection_Handler Connection Map.intEvent_Channel::close (u_long){  if (Options::instance ()->threading_strategy () != Options::REACTIVE)    {      if (ACE_Thread_Manager::instance ()->suspend_all () == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "(%t) %p\n",                           "suspend_all"),                          -1);      ACE_DEBUG ((LM_DEBUG,                  "(%t) suspending all threads\n"));    }  // First tell everyone that the spaceship is here...  {    CONNECTION_MAP_ITERATOR cmi (this->connection_map_);    // Iterate over all the handlers and shut them down.    for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.         cmi.next (me) != 0;         cmi.advance ())      {        Connection_Handler *connection_handler = me->int_id_;        ACE_DEBUG ((LM_DEBUG,                    "(%t) closing down connection %d\n",                    connection_handler->connection_id ()));        // If have no this statement, the gatewayd will abort when exiting        // with some Consumer/Supplier not connected.        if (connection_handler->state()==Connection_Handler::CONNECTING)          this->cancel_connection_connection(connection_handler);        // Mark Connection_Handler as DISCONNECTING so we don't try to        // reconnect...        connection_handler->state (Connection_Handler::DISCONNECTING);      }  }  // Close down the connector  this->connector_.close ();  // Close down the supplier acceptor.  this->supplier_acceptor_.close ();  // Close down the consumer acceptor.  this->consumer_acceptor_.close ();  // Now tell everyone that it is now time to commit suicide.  {    CONNECTION_MAP_ITERATOR cmi (this->connection_map_);    for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.         cmi.next (me) != 0;         cmi.advance ())      {        Connection_Handler *connection_handler = me->int_id_;        // Deallocate Connection_Handler resources.        connection_handler->destroy (); // Will trigger a delete.      }  }  return 0;}intEvent_Channel::find_proxy (ACE_INT32 connection_id,                               Connection_Handler *&connection_handler){  return this->connection_map_.find (connection_id,                                     connection_handler);}intEvent_Channel::bind_proxy (Connection_Handler *connection_handler){  int result = this->connection_map_.bind (connection_handler->connection_id (),                                           connection_handler);  switch (result)    {    case -1:      ACE_ERROR_RETURN ((LM_ERROR,                         "(%t) bind failed for connection %d\n",                         connection_handler->connection_id ()),                        -1);      /* NOTREACHED */    case 1: // Oops, found a duplicate!      ACE_ERROR_RETURN ((LM_ERROR,                         "(%t) duplicate connection %d, already bound\n",                         connection_handler->connection_id ()),                        -1);      /* NOTREACHED */    case 0:      // Success.      return 0;      /* NOTREACHED */    default:      ACE_ERROR_RETURN ((LM_DEBUG,                         "(%t) invalid result %d\n",                         result),                        -1);      /* NOTREACHED */    }  ACE_NOTREACHED (return 0);}intEvent_Channel::subscribe (const Event_Key &event_addr,                          Consumer_Dispatch_Set *cds){  int result = this->efd_.bind (event_addr, cds);  // Bind with consumer map, keyed by peer address.  switch (result)    {    case -1:      ACE_ERROR_RETURN ((LM_ERROR,                         "(%t) bind failed for connection %d\n",                         event_addr.connection_id_),                        -1);      /* NOTREACHED */    case 1: // Oops, found a duplicate!      ACE_ERROR_RETURN ((LM_DEBUG,                         "(%t) duplicate consumer map entry %d, "                         "already bound\n",                         event_addr.connection_id_),                        -1);      /* NOTREACHED */    case 0:      // Success.      return 0;    default:      ACE_ERROR_RETURN ((LM_DEBUG,                         "(%t) invalid result %d\n",                         result),                        -1);      /* NOTREACHED */    }  ACE_NOTREACHED (return 0);}intEvent_Channel::open (void *){  // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it.  ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);  ACE_UNUSED_ARG (sig);  // Actively initiate Peer connections.  this->initiate_connector ();  // Passively initiate Peer acceptor.  if (this->initiate_acceptors () == -1)    return -1;  // If we're not running reactively, then we need to make sure that  // <ACE_Message_Block> reference counting operations are  // thread-safe.  Therefore, we create an <ACE_Lock_Adapter> that is  // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions.  if (Options::instance ()->threading_strategy ()      != Options::REACTIVE)    {      ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la;      ACE_NEW_RETURN (la,                      ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,                      -1);      Options::instance ()->locking_strategy (la);    }  return 0;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>;template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Unbounded_Set_Iterator<Connection_Handler *>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *>#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -