📄 event_channel.cpp
字号:
ACE_DEBUG ((LM_DEBUG,
"(%t) scheduling reinitiation of Connection_Handler %d\n",
connection_handler->connection_id ()));
// Reschedule ourselves to try and connect again.
if (ACE_Reactor::instance ()->schedule_timer
(connection_handler,
0,
connection_handler->timeout ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"schedule_timer"),
-1);
}
return 0;
}
// It is useful to provide a separate method to cancel the
// asynchronous connecting.
int
Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler)
{
// Skip over proxies with deactivated handles.
if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
{
// Make sure to close down peer to reclaim descriptor.
connection_handler->peer ().close ();
// Cancel asynchronous connecting before re-initializing.
return this->connector_.cancel(connection_handler);
}
return 0;
}
// Initiate active connections with the Consumer and Supplier Peers.
void
Event_Channel::initiate_connector (void)
{
if (Options::instance ()->enabled
(Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
{
CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// Iterate through the Consumer Map connecting all the
// Connection_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
cmi.next (me) != 0;
cmi.advance ())
{
Connection_Handler *connection_handler = me->int_id_;
if (this->initiate_connection_connection (connection_handler) == -1)
continue; // Failures are handled elsewhere...
}
}
}
// Initiate passive acceptor to wait for Consumer and Supplier Peers
// to accept.
int
Event_Channel::initiate_acceptors (void)
{
if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR))
{
if (this->consumer_acceptor_.open
(Options::instance ()->consumer_acceptor_port (),
ACE_Reactor::instance (),
Options::instance ()->blocking_semantics ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"cannot register acceptor"),
-1);
else
ACE_DEBUG ((LM_DEBUG,
"accepting Consumers at %d\n",
Options::instance ()->consumer_acceptor_port ()));
}
if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR))
{
if(this->supplier_acceptor_.open
(Options::instance ()->supplier_acceptor_port (),
ACE_Reactor::instance (),
Options::instance ()->blocking_semantics ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"cannot register acceptor"),
-1);
else
ACE_DEBUG ((LM_DEBUG,
"accepting Suppliers at %d\n",
Options::instance ()->supplier_acceptor_port ()));
}
return 0;
}
// This method gracefully shuts down all the Handlers in the
// Connection_Handler Connection Map.
int
Event_Channel::close (u_long)
{
if (Options::instance ()->threading_strategy () != Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"suspend_all"),
-1);
ACE_DEBUG ((LM_DEBUG,
"(%t) suspending all threads\n"));
}
// First tell everyone that the spaceship is here...
{
CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// Iterate over all the handlers and shut them down.
for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
cmi.next (me) != 0;
cmi.advance ())
{
Connection_Handler *connection_handler = me->int_id_;
ACE_DEBUG ((LM_DEBUG,
"(%t) closing down connection %d\n",
connection_handler->connection_id ()));
// If have no this statement, the gatewayd will abort when exiting
// with some Consumer/Supplier not connected.
if (connection_handler->state()==Connection_Handler::CONNECTING)
this->cancel_connection_connection(connection_handler);
// Mark Connection_Handler as DISCONNECTING so we don't try to
// reconnect...
connection_handler->state (Connection_Handler::DISCONNECTING);
}
}
// Close down the connector
this->connector_.close ();
// Close down the supplier acceptor.
this->supplier_acceptor_.close ();
// Close down the consumer acceptor.
this->consumer_acceptor_.close ();
// Now tell everyone that it is now time to commit suicide.
{
CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
cmi.next (me) != 0;
cmi.advance ())
{
Connection_Handler *connection_handler = me->int_id_;
// Deallocate Connection_Handler resources.
connection_handler->destroy (); // Will trigger a delete.
}
}
return 0;
}
int
Event_Channel::find_proxy (ACE_INT32 connection_id,
Connection_Handler *&connection_handler)
{
return this->connection_map_.find (connection_id,
connection_handler);
}
int
Event_Channel::bind_proxy (Connection_Handler *connection_handler)
{
int result = this->connection_map_.bind (connection_handler->connection_id (),
connection_handler);
switch (result)
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) bind failed for connection %d\n",
connection_handler->connection_id ()),
-1);
/* NOTREACHED */
case 1: // Oops, found a duplicate!
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) duplicate connection %d, already bound\n",
connection_handler->connection_id ()),
-1);
/* NOTREACHED */
case 0:
// Success.
return 0;
/* NOTREACHED */
default:
ACE_ERROR_RETURN ((LM_DEBUG,
"(%t) invalid result %d\n",
result),
-1);
/* NOTREACHED */
}
ACE_NOTREACHED (return 0);
}
int
Event_Channel::subscribe (const Event_Key &event_addr,
Consumer_Dispatch_Set *cds)
{
int result = this->efd_.bind (event_addr, cds);
// Bind with consumer map, keyed by peer address.
switch (result)
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) bind failed for connection %d\n",
event_addr.connection_id_),
-1);
/* NOTREACHED */
case 1: // Oops, found a duplicate!
ACE_ERROR_RETURN ((LM_DEBUG,
"(%t) duplicate consumer map entry %d, "
"already bound\n",
event_addr.connection_id_),
-1);
/* NOTREACHED */
case 0:
// Success.
return 0;
default:
ACE_ERROR_RETURN ((LM_DEBUG,
"(%t) invalid result %d\n",
result),
-1);
/* NOTREACHED */
}
ACE_NOTREACHED (return 0);
}
int
Event_Channel::open (void *)
{
// Ignore <SIGPIPE> so each <Consumer_Handler> can handle it.
ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
ACE_UNUSED_ARG (sig);
// Actively initiate Peer connections.
this->initiate_connector ();
// Passively initiate Peer acceptor.
if (this->initiate_acceptors () == -1)
return -1;
// If we're not running reactively, then we need to make sure that
// <ACE_Message_Block> reference counting operations are
// thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is
// parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions.
if (Options::instance ()->threading_strategy ()
!= Options::REACTIVE)
{
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la;
ACE_NEW_RETURN (la,
ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,
-1);
Options::instance ()->locking_strategy (la);
}
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;
template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>;
template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
template class ACE_Unbounded_Set_Iterator<Connection_Handler *>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>
#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *>
#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>
#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -