📄 event_channel.cpp
字号:
// Event_Channel.cpp,v 4.34 2001/03/21 11:59:40 schmidt Exp
#define ACE_BUILD_SVC_DLL
#include "Connection_Handler_Connector.h"
#include "Event_Channel.h"
ACE_RCSID(Gateway, Event_Channel, "Event_Channel.cpp,v 4.34 2001/03/21 11:59:40 schmidt Exp")
Event_Channel::~Event_Channel (void)
{
}
Event_Channel::Event_Channel (void)
: supplier_acceptor_ (*this, 'S'),
consumer_acceptor_ (*this, 'C')
{
}
int
Event_Channel::compute_performance_statistics (void)
{
ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// If we've got a <ACE_Thread_Manager> then use it to suspend all
// the threads. This will enable us to get an accurate count.
if (Options::instance ()->threading_strategy ()
!= Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"suspend_all"),
-1);
ACE_DEBUG ((LM_DEBUG,
"(%t) suspending all threads..."));
}
size_t total_bytes_in = 0;
size_t total_bytes_out = 0;
// Iterate through the connection map summing up the number of bytes
// sent/received.
for (CONNECTION_MAP_ENTRY *me = 0;
cmi.next (me) != 0;
cmi.advance ())
{
Connection_Handler *connection_handler = me->int_id_;
if (connection_handler->connection_role () == 'C')
total_bytes_out += connection_handler->total_bytes ();
else // connection_handler->connection_role () == 'S'
total_bytes_in += connection_handler->total_bytes ();
}
ACE_DEBUG ((LM_DEBUG,
"(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
Options::instance ()->performance_window (),
total_bytes_in,
total_bytes_out));
ACE_DEBUG ((LM_DEBUG,
"(%t) %f Mbits/sec received.\n",
(float) (total_bytes_in * 8 /
(float) (1024 * 1024 * Options::instance ()->performance_window ()))));
ACE_DEBUG ((LM_DEBUG,
"(%t) %f Mbits/sec sent.\n",
(float) (total_bytes_out * 8 /
(float) (1024 * 1024 * Options::instance ()->performance_window ()))));
// Resume all the threads again.
if (Options::instance ()->threading_strategy ()
!= Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->resume_all () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"resume_all"),
-1);
ACE_DEBUG ((LM_DEBUG,
"(%t) resuming all threads..."));
}
return 0;
}
int
Event_Channel::handle_timeout (const ACE_Time_Value &,
const void *)
{
// This is called periodically to compute performance statistics.
return this->compute_performance_statistics ();
}
int
Event_Channel::put (ACE_Message_Block *event,
ACE_Time_Value *)
{
// We got a valid event, so determine its type, which is stored in
// the first of the two <ACE_Message_Block>s, which are chained
// together by <ACE::recv>.
Event_Key *event_key = (Event_Key *) event->rd_ptr ();
// Skip over the address portion and get the data, which is in the
// second <ACE_Message_Block>.
ACE_Message_Block *data = event->cont ();
switch (event_key->type_)
{
case ROUTING_EVENT:
this->routing_event (event_key,
data);
break;
case SUBSCRIPTION_EVENT:
this->subscription_event (data);
break;
}
// Release the memory in the message block.
event->release ();
return 0;
}
void
Event_Channel::subscription_event (ACE_Message_Block *data)
{
Event *event = (Event *) data->rd_ptr ();
ACE_DEBUG ((LM_DEBUG,
"(%t) received a subscription with %d bytes from connection id %d\n",
event->header_.len_,
event->header_.connection_id_));
Subscription *subscription = (Subscription *) event->data_;
// Convert the subscription into host byte order so that we can
// access it directly without having to repeatedly muck with it...
subscription->decode ();
ACE_DEBUG ((LM_DEBUG,
"(%t) connection_id_ = %d, total_consumers_ = %d\n",
subscription->connection_id_,
subscription->total_consumers_));
for (ACE_INT32 i = 0;
i < subscription->total_consumers_;
i++)
ACE_DEBUG ((LM_DEBUG,
"(%t) consumers_[%d] = %d\n",
i,
subscription->consumers_[i]));
}
void
Event_Channel::routing_event (Event_Key *forwarding_address,
ACE_Message_Block *data)
{
Consumer_Dispatch_Set *dispatch_set = 0;
// Initialize the <dispatch_set> to points to the set of Consumers
// associated with this forwarding address.
if (this->efd_.find (*forwarding_address,
dispatch_set) == -1)
// Failure.
ACE_ERROR ((LM_DEBUG,
"(%t) find failed on connection id = %d, type = %d\n",
forwarding_address->connection_id_,
forwarding_address->type_));
else
{
// Check to see if there are any consumers.
if (dispatch_set->size () == 0)
ACE_DEBUG ((LM_WARNING,
"there are no active consumers for this event currently\n"));
else // There are consumers, so forward the event.
{
// Initialize the interator.
Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
// At this point, we should assign a thread-safe locking
// strategy to the <ACE_Message_Block> is we're running in a
// multi-threaded configuration.
data->locking_strategy (Options::instance ()->locking_strategy ());
for (Connection_Handler **connection_handler = 0;
dsi.next (connection_handler) != 0;
dsi.advance ())
{
// Only process active connection_handlers.
if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED)
{
// Duplicate the event portion via reference
// counting.
ACE_Message_Block *dup_msg = data->duplicate ();
ACE_DEBUG ((LM_DEBUG,
"(%t) forwarding to Consumer %d\n",
(*connection_handler)->connection_id ()));
if ((*connection_handler)->put (dup_msg) == -1)
{
if (errno == EWOULDBLOCK) // The queue has filled up!
ACE_ERROR ((LM_ERROR,
"(%t) %p\n",
"gateway is flow controlled, so we're dropping events"));
else
ACE_ERROR ((LM_ERROR,
"(%t) %p transmission error to peer %d\n",
"put",
(*connection_handler)->connection_id ()));
// We are responsible for releasing an
// ACE_Message_Block if failures occur.
dup_msg->release ();
}
}
}
}
}
}
int
Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler,
int sync_directly)
{
ACE_Synch_Options synch_options;
if (sync_directly)
// In separated connection handler thread, connection can be
// initiated by block mode (synch mode) directly.
synch_options = ACE_Synch_Options::synch;
else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
synch_options = ACE_Synch_Options::asynch;
else
synch_options = ACE_Synch_Options::synch;
return this->connector_.initiate_connection (connection_handler,
synch_options);
}
int
Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
{
int option = connection_handler->connection_role () == 'S'
? SO_RCVBUF
: SO_SNDBUF;
int socket_queue_size =
Options::instance ()->socket_queue_size ();
if (socket_queue_size > 0)
if (connection_handler->peer ().set_option (SOL_SOCKET,
option,
&socket_queue_size,
sizeof (int)) == -1)
ACE_ERROR ((LM_ERROR,
"(%t) %p\n",
"set_option"));
connection_handler->thr_mgr (ACE_Thread_Manager::instance ());
// Our state is now "established."
connection_handler->state (Connection_Handler::ESTABLISHED);
// Restart the timeout to 1.
connection_handler->timeout (1);
ACE_INT32 id = htonl (connection_handler->connection_id ());
// Send the connection id to the peerd.
ssize_t n = connection_handler->peer ().send ((const void *) &id,
sizeof id);
if (n != sizeof id)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
n == 0 ? "peer has closed down unexpectedly" : "send"),
-1);
return 0;
}
// Restart connection (blocking_semantics dicates whether we restart
// synchronously or asynchronously).
int
Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
{
// Cancel asynchronous connecting before re-initializing. It will
// close the peer and cancel the asynchronous connecting.
this->cancel_connection_connection(connection_handler);
if (connection_handler->state () != Connection_Handler::DISCONNECTING)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -