⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer.cpp

📁 ACE源码
💻 CPP
📖 第 1 页 / 共 2 页
字号:
          // Reset the pointer to indicate we've got an entire event.
          this->msg_frag_ = 0;
        }

      ACE_DEBUG ((LM_DEBUG,
                  "(%t) connection id = %d, cur len = %d, total bytes read = %d\n",
                  event->header_.connection_id_,
                  event->header_.len_,
                  data_received + header_received));
      if (Options::instance ()->enabled (Options::VERBOSE))
        ACE_DEBUG ((LM_DEBUG,
                    "data_ = %*s\n",
                    event->header_.len_ - 2,
                    event->data_));
      return data_received + header_received;
    }
}

// Receive various types of input (e.g., Peer event from the gatewayd,
// as well as stdio).

int
Peer_Handler::handle_input (ACE_HANDLE sd)
{
  ACE_DEBUG ((LM_DEBUG,
              "in handle_input, sd = %d\n",
              sd));
  if (sd == ACE_STDIN) // Handle event from stdin.
    return this->transmit_stdin ();
  else
    // Perform the appropriate action depending on the state we are
    // in.
    return (this->*do_action_) ();
}

// Action that receives our connection id from the Gateway.

int
Peer_Handler::await_connection_id (void)
{
  ssize_t n = this->peer ().recv (&this->connection_id_,
                                  sizeof this->connection_id_);

  if (n != sizeof this->connection_id_)
    {
      if (n == 0)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "gatewayd has closed down unexpectedly\n"),
                          -1);
      else
        ACE_ERROR_RETURN ((LM_ERROR,
                           "%p, bytes received on handle %d = %d\n",
                          "recv",
                           this->get_handle (),
                           n),
                          -1);
    }
  else
    {
      this->connection_id_ = ntohl (this->connection_id_);
      ACE_DEBUG ((LM_DEBUG,
                  "assigned connection id %d\n",
                  this->connection_id_));
    }

  // Subscribe for events if we're a Consumer.
  if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
    this->subscribe ();

  // No need to disconnect by timeout.
  ACE_Reactor::instance ()->cancel_timer(this);
  // Transition to the action that waits for Peer events.
  this->do_action_ = &Peer_Handler::await_events;

  // Reset standard input.
  ACE_OS::rewind (stdin);

  // Call register_stdin_handler only once, until the stdin-thread
  // closed which caused by transmit_stdin error.
  if (first_time_)
    {
      // Register this handler to receive test events on stdin.
      if (ACE_Event_Handler::register_stdin_handler
          (this,
           ACE_Reactor::instance (),
           ACE_Thread_Manager::instance ()) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "register_stdin_handler"),
                          -1);

      // Next time in await_connection_id(), I'll don't call
      // register_stdin_handler().
      first_time_ = 0;
    }
  return 0;
}

int
Peer_Handler::subscribe (void)
{
  ACE_Message_Block *mb;

  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (sizeof (Event)),
                  -1);

  Subscription *subscription =
    (Subscription *) ((Event *) mb->rd_ptr ())->data_;
  subscription->connection_id_ =
    Options::instance ()->connection_id ();

  return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
}

// Action that receives events from the Gateway.

int
Peer_Handler::await_events (void)
{
  ACE_Message_Block *mb = 0;

  ssize_t n = this->recv (mb);

  switch (n)
    {
    case 0:
      ACE_ERROR_RETURN ((LM_ERROR,
                         "gatewayd has closed down\n"),
                        -1);
      /* NOTREACHED */
    case -1:
      if (errno == EWOULDBLOCK)
        // A short-read, we'll come back and finish it up later on!
        return 0;
      else
        ACE_ERROR_RETURN ((LM_ERROR,
                           "%p\n",
                           "recv"),
                          -1);
      /* NOTREACHED */
    default:
      {
        // We got a valid event, so let's process it now!  At the
        // moment, we just print out the event contents...

        Event *event = (Event *) mb->rd_ptr ();
        this->total_bytes_ += mb->length ();

        ACE_DEBUG ((LM_DEBUG,
                    "route id = %d, cur len = %d, total len = %d\n",
                    event->header_.connection_id_,
                    event->header_.len_,
                    this->total_bytes_));
        if (Options::instance ()->enabled (Options::VERBOSE))
          ACE_DEBUG ((LM_DEBUG,
                      "data_ = %*s\n",
                      event->header_.len_ - 2,
                      event->data_));
        mb->release ();
        return 0;
      }
    }
}

// Periodically send events via ACE_Reactor timer mechanism.

int
Peer_Handler::handle_timeout (const ACE_Time_Value &,
                              const void *)
{
  // Shut down the handler.
  return this->handle_close ();
}

Peer_Handler::~Peer_Handler (void)
{
  // Shut down the handler.
  this->handle_close ();
}

// Handle shutdown of the Peer object.

int
Peer_Handler::handle_close (ACE_HANDLE,
                            ACE_Reactor_Mask)
{
  if (this->get_handle () != ACE_INVALID_HANDLE)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "shutting down Peer on handle %d\n",
                 this->get_handle ()));

      ACE_Reactor_Mask mask =
        ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK;

      // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>
      // removes the HANDLE.  Note that <ACE_Event_Handler::DONT_CALL>
      // instructs the ACE_Reactor *not* to call <handle_close>, which
      // would otherwise lead to infinite recursion!).
      ACE_Reactor::instance ()->remove_handler
        (ACE_STDIN, mask);

      // Deregister this handler with the ACE_Reactor.
      if (ACE_Reactor::instance ()->remove_handler
          (this, mask) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "handle = %d: %p\n",
                           this->get_handle (),
                           "remove_handler"),
                          -1);
      // Close down the peer.
      this->peer ().close ();
    }
  return 0;
}

int
Peer_Acceptor::start (u_short port)
{
  // This object only gets allocated once and is just recycled
  // forever.
  ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1);

  this->addr_.set (port);

  ACE_DEBUG ((LM_DEBUG,
              "opening acceptor at port %d\n",
              port));

  // Call down to the <Acceptor::open> method.
  if (this->inherited::open (this->addr_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "open"),
                      -1);
  else if (this->acceptor ().get_local_addr (this->addr_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "get_local_addr"),
                      -1);
  else
    ACE_DEBUG ((LM_DEBUG,
                "accepting at port %d\n",
                this->addr_.get_port_number ()));
  return 0;
}

Peer_Acceptor::Peer_Acceptor (void)
  : peer_handler_ (0)
{
}

int
Peer_Acceptor::close (void)
{
  // Will trigger a delete.
  if (this->peer_handler_ != 0)
    this->peer_handler_->destroy ();

  // Close down the base class.
  return this->inherited::close ();
}

// Note how this method just passes back the pre-allocated
// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new
// one each time!

int
Peer_Acceptor::make_svc_handler (Peer_Handler *&sh)
{
  sh = this->peer_handler_;
  return 0;
}

int
Peer_Connector::open_connector (Peer_Handler *&peer_handler,
                                u_short port)
{
  // This object only gets allocated once and is just recycled
  // forever.
  ACE_NEW_RETURN (peer_handler,
                  Peer_Handler,
                  -1);

  ACE_INET_Addr addr (port,
                      Options::instance ()->connector_host ());

  ACE_DEBUG ((LM_DEBUG,
              "connecting to %s:%d\n",
              addr.get_host_name (),
              addr.get_port_number ()));

  if (this->connect (peer_handler, addr) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "connect"),
                      -1);
  else
    ACE_DEBUG ((LM_DEBUG,
                "connected to %s:%d\n",
                addr.get_host_name (),
                addr.get_port_number ()));
  return 0;
}

int
Peer_Connector::open (ACE_Reactor *, int)
{
  this->supplier_peer_handler_ = 0;
  this->consumer_peer_handler_ = 0;

  if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
      && this->open_connector (this->supplier_peer_handler_,
                               Options::instance ()->supplier_connector_port ()) == -1)
    return -1;

  if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)
      && this->open_connector (this->consumer_peer_handler_,
                               Options::instance ()->consumer_connector_port ()) == -1)
    return -1;

  return 0;
}

int
Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *)
{
  if (signum != SIGPIPE)
  {
    // Shut down the main event loop.
    ACE_DEBUG((LM_NOTICE, "Exit case signal\n")); // Why do I exit?
    ACE_Reactor::end_event_loop();
  }

  return 0;
}

// Returns information on the currently active service.

int
Peer_Factory::info (char **strp, size_t length) const
{
  char buf[BUFSIZ];
  char consumer_addr_str[BUFSIZ];
  char supplier_addr_str[BUFSIZ];

  ACE_INET_Addr addr;

  if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1)
    return -1;
  else if (addr.addr_to_string (consumer_addr_str,
                                sizeof addr) == -1)
    return -1;
  else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1)
    return -1;
  else if (addr.addr_to_string (supplier_addr_str,
                                sizeof addr) == -1)
    return -1;

  ACE_OS::sprintf (buf,
                   "%s\t C:%s|S:%s/%s %s",
                   "peerd",
                   consumer_addr_str,
                   supplier_addr_str,
                   "tcp",
                   "# Gateway traffic generator and data sink\n");

  if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
    return -1;
  else
    ACE_OS::strncpy (*strp, buf, length);
  return ACE_OS::strlen (buf);
}

// Hook called by the explicit dynamic linking facility to terminate
// the peer.

int
Peer_Factory::fini (void)
{
  this->consumer_acceptor_.close ();
  this->supplier_acceptor_.close ();
  return 0;
}

// Hook called by the explicit dynamic linking facility to initialize
// the peer.

int
Peer_Factory::init (int argc, char *argv[])
{
  Options::instance ()->parse_args (argc, argv);

  ACE_Sig_Set sig_set;

  sig_set.sig_add (SIGINT);
  sig_set.sig_add (SIGQUIT);
  sig_set.sig_add (SIGPIPE);

  // Register ourselves to receive signals so we can shut down
  // gracefully.

  if (ACE_Reactor::instance ()->register_handler (sig_set,
                                                  this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "register_handler"),
                      -1);

  if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
      && this->supplier_acceptor_.start
      (Options::instance ()->supplier_acceptor_port ()) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "Acceptor::open"),
                      -1);
  else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
           && this->consumer_acceptor_.start
           (Options::instance ()->consumer_acceptor_port ()) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "Acceptor::open"),
                      -1);
  else if (this->connector_.open () == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "Connector::open"),
                      -1);
  return 0;
}

// The following is a "Factory" used by the <ACE_Service_Config> and
// svc.conf file to dynamically initialize the <Peer_Acceptor> and
// <Peer_Connector>.

ACE_SVC_FACTORY_DEFINE (Peer_Factory)

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;
template class ACE_Connector_Base<Peer_Handler>;
template class ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>;
template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
template class ACE_NonBlocking_Connect_Handler<Peer_Handler>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
#pragma instantiate ACE_Connector_Base<Peer_Handler>
#pragma instantiate ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>
#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
#pragma instantiate ACE_NonBlocking_Connect_Handler<Peer_Handler>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -