📄 event_channel.cpp
字号:
"(%t) scheduling reinitiation of Connection_Handler %d\n", connection_handler->connection_id ())); // Reschedule ourselves to try and connect again. ACE_Time_Value const timeout (connection_handler->timeout ()); if (ACE_Reactor::instance ()->schedule_timer (connection_handler, 0, timeout) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); } return 0;}// It is useful to provide a separate method to cancel the// asynchronous connecting.intEvent_Channel::cancel_connection_connection (Connection_Handler *connection_handler){ // Skip over proxies with deactivated handles. if (connection_handler->get_handle () != ACE_INVALID_HANDLE) { // Make sure to close down peer to reclaim descriptor. connection_handler->peer ().close (); // Cancel asynchronous connecting before re-initializing. return this->connector_.cancel(connection_handler); } return 0;}// Initiate active connections with the Consumer and Supplier Peers.voidEvent_Channel::initiate_connector (void){ if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) { CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate through the Consumer Map connecting all the // Connection_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cmi.next (me) != 0; cmi.advance ()) { Connection_Handler *connection_handler = me->int_id_; if (this->initiate_connection_connection (connection_handler) == -1) continue; // Failures are handled elsewhere... } }}// Initiate passive acceptor to wait for Consumer and Supplier Peers// to accept.intEvent_Channel::initiate_acceptors (void){ if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)) { ACE_INET_Addr consumer_addr (Options::instance ()->consumer_acceptor_port ()); if (this->consumer_acceptor_.open (consumer_addr, ACE_Reactor::instance (), Options::instance ()->blocking_semantics ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "cannot register acceptor"), -1); else ACE_DEBUG ((LM_DEBUG, "accepting Consumers at %d\n", Options::instance ()->consumer_acceptor_port ())); } if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)) { ACE_INET_Addr supplier_addr (Options::instance ()->supplier_acceptor_port ()); if (this->supplier_acceptor_.open (supplier_addr, ACE_Reactor::instance (), Options::instance ()->blocking_semantics ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "cannot register acceptor"), -1); else ACE_DEBUG ((LM_DEBUG, "accepting Suppliers at %d\n", Options::instance ()->supplier_acceptor_port ())); } return 0;}// This method gracefully shuts down all the Handlers in the// Connection_Handler Connection Map.intEvent_Channel::close (u_long){ if (Options::instance ()->threading_strategy () != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); } // First tell everyone that the spaceship is here... { CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate over all the handlers and shut them down. for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. cmi.next (me) != 0; cmi.advance ()) { Connection_Handler *connection_handler = me->int_id_; ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n", connection_handler->connection_id ())); // If have no this statement, the gatewayd will abort when exiting // with some Consumer/Supplier not connected. if (connection_handler->state()==Connection_Handler::CONNECTING) this->cancel_connection_connection(connection_handler); // Mark Connection_Handler as DISCONNECTING so we don't try to // reconnect... connection_handler->state (Connection_Handler::DISCONNECTING); } } // Close down the connector this->connector_.close (); // Close down the supplier acceptor. this->supplier_acceptor_.close (); // Close down the consumer acceptor. this->consumer_acceptor_.close (); // Now tell everyone that it is now time to commit suicide. { CONNECTION_MAP_ITERATOR cmi (this->connection_map_); for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. cmi.next (me) != 0; cmi.advance ()) { Connection_Handler *connection_handler = me->int_id_; // Deallocate Connection_Handler resources. connection_handler->destroy (); // Will trigger a delete. } } return 0;}intEvent_Channel::find_proxy (ACE_INT32 connection_id, Connection_Handler *&connection_handler){ return this->connection_map_.find (connection_id, connection_handler);}intEvent_Channel::bind_proxy (Connection_Handler *connection_handler){ int result = this->connection_map_.bind (connection_handler->connection_id (), connection_handler); switch (result) { case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", connection_handler->connection_id ()), -1); /* NOTREACHED */ case 1: // Oops, found a duplicate! ACE_ERROR_RETURN ((LM_ERROR, "(%t) duplicate connection %d, already bound\n", connection_handler->connection_id ()), -1); /* NOTREACHED */ case 0: // Success. return 0; /* NOTREACHED */ default: ACE_ERROR_RETURN ((LM_DEBUG, "(%t) invalid result %d\n", result), -1); /* NOTREACHED */ } ACE_NOTREACHED (return 0);}intEvent_Channel::subscribe (const Event_Key &event_addr, Consumer_Dispatch_Set *cds){ int result = this->efd_.bind (event_addr, cds); // Bind with consumer map, keyed by peer address. switch (result) { case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", event_addr.connection_id_), -1); /* NOTREACHED */ case 1: // Oops, found a duplicate! ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " "already bound\n", event_addr.connection_id_), -1); /* NOTREACHED */ case 0: // Success. return 0; default: ACE_ERROR_RETURN ((LM_DEBUG, "(%t) invalid result %d\n", result), -1); /* NOTREACHED */ } ACE_NOTREACHED (return 0);}intEvent_Channel::open (void *){ // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it. ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE); ACE_UNUSED_ARG (sig); // Actively initiate Peer connections. this->initiate_connector (); // Passively initiate Peer acceptor. if (this->initiate_acceptors () == -1) return -1; // If we're not running reactively, then we need to make sure that // <ACE_Message_Block> reference counting operations are // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions. if (Options::instance ()->threading_strategy () != Options::REACTIVE) { ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la; ACE_NEW_RETURN (la, ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, -1); Options::instance ()->locking_strategy (la); } return 0;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>;template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>;template class ACE_Unbounded_Set_Iterator<Connection_Handler *>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *>#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -