⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 一个开源的网络开发库ACE
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Event_Channel.cpp,v 4.34 2001/03/21 11:59:40 schmidt Exp

#define ACE_BUILD_SVC_DLL

#include "Connection_Handler_Connector.h"
#include "Event_Channel.h"

ACE_RCSID(Gateway, Event_Channel, "Event_Channel.cpp,v 4.34 2001/03/21 11:59:40 schmidt Exp")

Event_Channel::~Event_Channel (void)
{
}

Event_Channel::Event_Channel (void)
  : supplier_acceptor_ (*this, 'S'),
    consumer_acceptor_ (*this, 'C')
{
}

int
Event_Channel::compute_performance_statistics (void)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
  CONNECTION_MAP_ITERATOR cmi (this->connection_map_);

  // If we've got a <ACE_Thread_Manager> then use it to suspend all
  // the threads.  This will enable us to get an accurate count.

  if (Options::instance ()->threading_strategy ()
      != Options::REACTIVE)
    {
      if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "suspend_all"),
                          -1);
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) suspending all threads..."));
    }

  size_t total_bytes_in = 0;
  size_t total_bytes_out = 0;

  // Iterate through the connection map summing up the number of bytes
  // sent/received.

  for (CONNECTION_MAP_ENTRY *me = 0;
       cmi.next (me) != 0;
       cmi.advance ())
    {
      Connection_Handler *connection_handler = me->int_id_;

      if (connection_handler->connection_role () == 'C')
        total_bytes_out += connection_handler->total_bytes ();
      else // connection_handler->connection_role () == 'S'
        total_bytes_in += connection_handler->total_bytes ();
    }

  ACE_DEBUG ((LM_DEBUG,
              "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
              Options::instance ()->performance_window (),
              total_bytes_in,
              total_bytes_out));

  ACE_DEBUG ((LM_DEBUG,
              "(%t) %f Mbits/sec received.\n",
              (float) (total_bytes_in * 8 /
                       (float) (1024 * 1024 * Options::instance ()->performance_window ()))));

  ACE_DEBUG ((LM_DEBUG,
              "(%t) %f Mbits/sec sent.\n",
              (float) (total_bytes_out * 8 /
                       (float) (1024 * 1024 * Options::instance ()->performance_window ()))));

  // Resume all the threads again.

  if (Options::instance ()->threading_strategy ()
      != Options::REACTIVE)
    {
      if (ACE_Thread_Manager::instance ()->resume_all () == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "resume_all"),
                          -1);
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) resuming all threads..."));
    }


  return 0;
}

int
Event_Channel::handle_timeout (const ACE_Time_Value &,
                               const void *)
{
  // This is called periodically to compute performance statistics.
  return this->compute_performance_statistics ();
}

int
Event_Channel::put (ACE_Message_Block *event,
                    ACE_Time_Value *)
{
  // We got a valid event, so determine its type, which is stored in
  // the first of the two <ACE_Message_Block>s, which are chained
  // together by <ACE::recv>.
  Event_Key *event_key = (Event_Key *) event->rd_ptr ();

  // Skip over the address portion and get the data, which is in the
  // second <ACE_Message_Block>.
  ACE_Message_Block *data = event->cont ();

  switch (event_key->type_)
    {
    case ROUTING_EVENT:
      this->routing_event (event_key,
                           data);
      break;
    case SUBSCRIPTION_EVENT:
      this->subscription_event (data);
      break;
    }

  // Release the memory in the message block.
  event->release ();
  return 0;
}

void
Event_Channel::subscription_event (ACE_Message_Block *data)
{
  Event *event = (Event *) data->rd_ptr ();

  ACE_DEBUG ((LM_DEBUG,
              "(%t) received a subscription with %d bytes from connection id %d\n",
              event->header_.len_,
              event->header_.connection_id_));
  Subscription *subscription = (Subscription *) event->data_;
  // Convert the subscription into host byte order so that we can
  // access it directly without having to repeatedly muck with it...
  subscription->decode ();

  ACE_DEBUG ((LM_DEBUG,
              "(%t) connection_id_ = %d, total_consumers_ = %d\n",
              subscription->connection_id_,
              subscription->total_consumers_));

  for (ACE_INT32 i = 0;
       i < subscription->total_consumers_;
       i++)
    ACE_DEBUG ((LM_DEBUG,
                "(%t) consumers_[%d] = %d\n",
                i,
                subscription->consumers_[i]));

}

void
Event_Channel::routing_event (Event_Key *forwarding_address,
                              ACE_Message_Block *data)
{
  Consumer_Dispatch_Set *dispatch_set = 0;

  // Initialize the <dispatch_set> to points to the set of Consumers
  // associated with this forwarding address.

  if (this->efd_.find (*forwarding_address,
                       dispatch_set) == -1)
    // Failure.
    ACE_ERROR ((LM_DEBUG,
                "(%t) find failed on connection id = %d, type = %d\n",
                forwarding_address->connection_id_,
                forwarding_address->type_));
  else
    {
      // Check to see if there are any consumers.
      if (dispatch_set->size () == 0)
        ACE_DEBUG ((LM_WARNING,
                    "there are no active consumers for this event currently\n"));

      else // There are consumers, so forward the event.
        {
          // Initialize the interator.
          Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);

          // At this point, we should assign a thread-safe locking
          // strategy to the <ACE_Message_Block> is we're running in a
          // multi-threaded configuration.
          data->locking_strategy (Options::instance ()->locking_strategy ());

          for (Connection_Handler **connection_handler = 0;
               dsi.next (connection_handler) != 0;
               dsi.advance ())
            {
              // Only process active connection_handlers.
              if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED)
                {
                  // Duplicate the event portion via reference
                  // counting.
                  ACE_Message_Block *dup_msg = data->duplicate ();

                  ACE_DEBUG ((LM_DEBUG,
                              "(%t) forwarding to Consumer %d\n",
                              (*connection_handler)->connection_id ()));

                  if ((*connection_handler)->put (dup_msg) == -1)
                    {
                      if (errno == EWOULDBLOCK) // The queue has filled up!
                        ACE_ERROR ((LM_ERROR,
                                    "(%t) %p\n",
                                    "gateway is flow controlled, so we're dropping events"));
                      else
                        ACE_ERROR ((LM_ERROR,
                                    "(%t) %p transmission error to peer %d\n",
                                    "put",
                                    (*connection_handler)->connection_id ()));

                      // We are responsible for releasing an
                      // ACE_Message_Block if failures occur.
                      dup_msg->release ();
                    }
                }
            }
        }
    }
}

int
Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler,
                                               int sync_directly)
{
  ACE_Synch_Options synch_options;

  if (sync_directly)
    // In separated connection handler thread, connection can be
    // initiated by block mode (synch mode) directly.
    synch_options = ACE_Synch_Options::synch;
  else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
    synch_options = ACE_Synch_Options::asynch;
  else
    synch_options = ACE_Synch_Options::synch;

  return this->connector_.initiate_connection (connection_handler,
                                               synch_options);
}

int
Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
{
  int option = connection_handler->connection_role () == 'S' 
    ? SO_RCVBUF 
    : SO_SNDBUF;
  int socket_queue_size =
    Options::instance ()->socket_queue_size ();

  if (socket_queue_size > 0)
    if (connection_handler->peer ().set_option (SOL_SOCKET,
                                                option,
                                                &socket_queue_size,
                                                sizeof (int)) == -1)
      ACE_ERROR ((LM_ERROR,
                  "(%t) %p\n",
                  "set_option"));

  connection_handler->thr_mgr (ACE_Thread_Manager::instance ());

  // Our state is now "established."
  connection_handler->state (Connection_Handler::ESTABLISHED);

  // Restart the timeout to 1.
  connection_handler->timeout (1);

  ACE_INT32 id = htonl (connection_handler->connection_id ());

  // Send the connection id to the peerd.

  ssize_t n = connection_handler->peer ().send ((const void *) &id,
                                                sizeof id);

  if (n != sizeof id)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                      n == 0 ? "peer has closed down unexpectedly" : "send"),
                      -1);
  return 0;
}

// Restart connection (blocking_semantics dicates whether we restart
// synchronously or asynchronously).

int
Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
{
  // Cancel asynchronous connecting before re-initializing.  It will
  // close the peer and cancel the asynchronous connecting.
  this->cancel_connection_connection(connection_handler);

  if (connection_handler->state () != Connection_Handler::DISCONNECTING)
    {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -