⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Event_Channel.cpp,v 4.39 2006/01/23 18:13:38 shuston Exp#define ACE_BUILD_SVC_DLL#include "Connection_Handler_Connector.h"#include "Event_Channel.h"#include "ace/OS_NS_sys_select.h"#include "ace/Signal.h"ACE_RCSID(Gateway, Event_Channel, "Event_Channel.cpp,v 4.39 2006/01/23 18:13:38 shuston Exp")Event_Channel::~Event_Channel (void){}Event_Channel::Event_Channel (void)  : supplier_acceptor_ (*this, 'S'),    consumer_acceptor_ (*this, 'C'){}intEvent_Channel::compute_performance_statistics (void){  ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));  CONNECTION_MAP_ITERATOR cmi (this->connection_map_);  // If we've got a <ACE_Thread_Manager> then use it to suspend all  // the threads.  This will enable us to get an accurate count.  if (Options::instance ()->threading_strategy ()      != Options::REACTIVE)    {      if (ACE_Thread_Manager::instance ()->suspend_all () == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "(%t) %p\n",                           "suspend_all"),                          -1);      ACE_DEBUG ((LM_DEBUG,                  "(%t) suspending all threads..."));    }  size_t total_bytes_in = 0;  size_t total_bytes_out = 0;  // Iterate through the connection map summing up the number of bytes  // sent/received.  for (CONNECTION_MAP_ENTRY *me = 0;       cmi.next (me) != 0;       cmi.advance ())    {      Connection_Handler *connection_handler = me->int_id_;      if (connection_handler->connection_role () == 'C')        total_bytes_out += connection_handler->total_bytes ();      else // connection_handler->connection_role () == 'S'        total_bytes_in += connection_handler->total_bytes ();    }  ACE_DEBUG ((LM_DEBUG,              "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",              Options::instance ()->performance_window (),              total_bytes_in,              total_bytes_out));  ACE_DEBUG ((LM_DEBUG,              "(%t) %f Mbits/sec received.\n",              (float) (total_bytes_in * 8 /                       (float) (1024 * 1024 * Options::instance ()->performance_window ()))));  ACE_DEBUG ((LM_DEBUG,              "(%t) %f Mbits/sec sent.\n",              (float) (total_bytes_out * 8 /                       (float) (1024 * 1024 * Options::instance ()->performance_window ()))));  // Resume all the threads again.  if (Options::instance ()->threading_strategy ()      != Options::REACTIVE)    {      if (ACE_Thread_Manager::instance ()->resume_all () == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "(%t) %p\n",                           "resume_all"),                          -1);      ACE_DEBUG ((LM_DEBUG,                  "(%t) resuming all threads..."));    }  return 0;}intEvent_Channel::handle_timeout (const ACE_Time_Value &,                               const void *){  // This is called periodically to compute performance statistics.  return this->compute_performance_statistics ();}intEvent_Channel::put (ACE_Message_Block *event,                    ACE_Time_Value *){  // We got a valid event, so determine its type, which is stored in  // the first of the two <ACE_Message_Block>s, which are chained  // together by <ACE::recv>.  Event_Key *event_key = (Event_Key *) event->rd_ptr ();  // Skip over the address portion and get the data, which is in the  // second <ACE_Message_Block>.  ACE_Message_Block *data = event->cont ();  switch (event_key->type_)    {    case ROUTING_EVENT:      this->routing_event (event_key,                           data);      break;    case SUBSCRIPTION_EVENT:      this->subscription_event (data);      break;    }  // Release the memory in the message block.  event->release ();  return 0;}voidEvent_Channel::subscription_event (ACE_Message_Block *data){  Event *event = (Event *) data->rd_ptr ();  ACE_DEBUG ((LM_DEBUG,              "(%t) received a subscription with %d bytes from connection id %d\n",              event->header_.len_,              event->header_.connection_id_));  Subscription *subscription = (Subscription *) event->data_;  // Convert the subscription into host byte order so that we can  // access it directly without having to repeatedly muck with it...  subscription->decode ();  ACE_DEBUG ((LM_DEBUG,              "(%t) connection_id_ = %d, total_consumers_ = %d\n",              subscription->connection_id_,              subscription->total_consumers_));  for (ACE_INT32 i = 0;       i < subscription->total_consumers_;       i++)    ACE_DEBUG ((LM_DEBUG,                "(%t) consumers_[%d] = %d\n",                i,                subscription->consumers_[i]));}voidEvent_Channel::routing_event (Event_Key *forwarding_address,                              ACE_Message_Block *data){  Consumer_Dispatch_Set *dispatch_set = 0;  // Initialize the <dispatch_set> to points to the set of Consumers  // associated with this forwarding address.  if (this->efd_.find (*forwarding_address,                       dispatch_set) == -1)    // Failure.    ACE_ERROR ((LM_DEBUG,                "(%t) find failed on connection id = %d, type = %d\n",                forwarding_address->connection_id_,                forwarding_address->type_));  else    {      // Check to see if there are any consumers.      if (dispatch_set->size () == 0)        ACE_DEBUG ((LM_WARNING,                    "there are no active consumers for this event currently\n"));      else // There are consumers, so forward the event.        {          // Initialize the interator.          Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);          // At this point, we should assign a thread-safe locking          // strategy to the <ACE_Message_Block> is we're running in a          // multi-threaded configuration.          data->locking_strategy (Options::instance ()->locking_strategy ());          for (Connection_Handler **connection_handler = 0;               dsi.next (connection_handler) != 0;               dsi.advance ())            {              // Only process active connection_handlers.              if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED)                {                  // Duplicate the event portion via reference                  // counting.                  ACE_Message_Block *dup_msg = data->duplicate ();                  ACE_DEBUG ((LM_DEBUG,                              "(%t) forwarding to Consumer %d\n",                              (*connection_handler)->connection_id ()));                  if ((*connection_handler)->put (dup_msg) == -1)                    {                      if (errno == EWOULDBLOCK) // The queue has filled up!                        ACE_ERROR ((LM_ERROR,                                    "(%t) %p\n",                                    "gateway is flow controlled, so we're dropping events"));                      else                        ACE_ERROR ((LM_ERROR,                                    "(%t) %p transmission error to peer %d\n",                                    "put",                                    (*connection_handler)->connection_id ()));                      // We are responsible for releasing an                      // ACE_Message_Block if failures occur.                      dup_msg->release ();                    }                }            }        }    }}intEvent_Channel::initiate_connection_connection (Connection_Handler *connection_handler,                                               int sync_directly){  ACE_Synch_Options synch_options;  if (sync_directly)    // In separated connection handler thread, connection can be    // initiated by block mode (synch mode) directly.    synch_options = ACE_Synch_Options::synch;  else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)    synch_options = ACE_Synch_Options::asynch;  else    synch_options = ACE_Synch_Options::synch;  return this->connector_.initiate_connection (connection_handler,                                               synch_options);}intEvent_Channel::complete_connection_connection (Connection_Handler *connection_handler){  int option = connection_handler->connection_role () == 'S'    ? SO_RCVBUF    : SO_SNDBUF;  int socket_queue_size =    Options::instance ()->socket_queue_size ();  if (socket_queue_size > 0)    if (connection_handler->peer ().set_option (SOL_SOCKET,                                                option,                                                &socket_queue_size,                                                sizeof (int)) == -1)      ACE_ERROR ((LM_ERROR,                  "(%t) %p\n",                  "set_option"));  connection_handler->thr_mgr (ACE_Thread_Manager::instance ());  // Our state is now "established."  connection_handler->state (Connection_Handler::ESTABLISHED);  // Restart the timeout to 1.  connection_handler->timeout (1);  ACE_INT32 id = htonl (connection_handler->connection_id ());  // Send the connection id to the peerd.  ssize_t n = connection_handler->peer ().send ((const void *) &id,                                                sizeof id);  if (n != sizeof id)    ACE_ERROR_RETURN ((LM_ERROR,                       "(%t) %p\n",                      n == 0 ? "peer has closed down unexpectedly" : "send"),                      -1);  return 0;}// Restart connection (blocking_semantics dicates whether we restart// synchronously or asynchronously).intEvent_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler){  // Cancel asynchronous connecting before re-initializing.  It will  // close the peer and cancel the asynchronous connecting.  this->cancel_connection_connection(connection_handler);  if (connection_handler->state () != Connection_Handler::DISCONNECTING)    {      ACE_DEBUG ((LM_DEBUG,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -