⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 concrete_connection_handlers.cpp

📁 一个开源的网络开发库ACE
💻 CPP
📖 第 1 页 / 共 2 页
字号:
  ssize_t data_bytes_left_to_read =
    ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));

  ssize_t data_received =
    !data_bytes_left_to_read
    ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0.
    : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);

  // Try to receive the remainder of the event.

  switch (data_received)
    {
    case -1:
      if (errno == EWOULDBLOCK)
        // This might happen if only the header came through.
        return -1;
      else
        /* FALLTHROUGH */;

    case 0: // Premature EOF.
      if (data_bytes_left_to_read)
        {
          this->msg_frag_ = this->msg_frag_->release ();
          return 0;
        }
      /* FALLTHROUGH */;

    default:
      // Set the write pointer at 1 past the end of the event.
      this->msg_frag_->wr_ptr (data_received);

      if (data_received != data_bytes_left_to_read)
        {
          errno = EWOULDBLOCK;
          // Inform caller that we didn't get the whole event.
          return -1;
        }
      else
        {
          // Set the read pointer to the beginning of the event.
          this->msg_frag_->rd_ptr (this->msg_frag_->base ());

          // Allocate an event forwarding header and chain the data
          // portion onto its continuation field.
          forward_addr = new ACE_Message_Block (sizeof (Event_Key),
                                                ACE_Message_Block::MB_PROTO,
                                                this->msg_frag_,
                                                0,
                                                0,
                                                Options::instance ()->locking_strategy ());
          if (forward_addr == 0)
            {
              this->msg_frag_ = this->msg_frag_->release ();
              errno = ENOMEM;
              return -1;
            }

          Event_Key event_addr (this->connection_id (),
                                event->header_.type_);
          // Copy the forwarding address from the Event_Key into
          // forward_addr.
          forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));

          // Reset the pointer to indicate we've got an entire event.
          this->msg_frag_ = 0;
        }

      this->total_bytes (data_received + header_received);
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) connection id = %d, cur len = %d, total bytes read = %d\n",
                  event->header_.connection_id_,
                  event->header_.len_,
                  data_received + header_received));
      if (Options::instance ()->enabled (Options::VERBOSE))
        ACE_DEBUG ((LM_DEBUG,
                    "data_ = %*s\n",
                    event->header_.len_ - 2,
                    event->data_));

      // Encode before returning so that we can set things out in
      // network byte order.
      event->header_.encode ();
      return data_received + header_received;
    }
}

// Receive various types of input (e.g., Peer event from the gatewayd,
// as well as stdio).

int
Supplier_Handler::handle_input (ACE_HANDLE)
{
  ACE_Message_Block *event_key = 0;

  switch (this->recv (event_key))
    {
    case 0:
      // Note that a peer shouldn't initiate a shutdown by closing the
      // connection.  Therefore, the peer must have crashed, so we'll
      // need to bail out here and let the higher layers reconnect.
      this->state (Connection_Handler::FAILED);
      ACE_ERROR_RETURN ((LM_ERROR,
                        "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n",
                        this->connection_id ()),
                        -1);
      /* NOTREACHED */
    case -1:
      if (errno == EWOULDBLOCK)
        // A short-read, we'll come back and finish it up later on!
        return 0;
      else // A weird problem occurred, shut down and start again.
        {
          this->state (Connection_Handler::FAILED);
          ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n",
                             "Peer has failed unexpectedly",
                             this->connection_id ()),
                            -1);
        }
      /* NOTREACHED */
    default:
      // Route messages to Consumers.
      return this->process (event_key);
    }
}

// This delegates to the <Event_Channel> to do the actual processing.
// Typically, this forwards the event to its appropriate Consumer(s).

int
Supplier_Handler::process (ACE_Message_Block *event_key)
{
  return this->event_channel_->put (event_key);
}

Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)
  : Consumer_Handler (pci)
{
  // It is not in thread svc() now.
  in_thread_ = 0;
}

// Overriding handle_close() method.  If in thread svc(), no need to
// process handle_close() when call peer().close(), because the
// connection is blocked now.

int
Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
{
  if (in_thread_)
    return 0;
  else
    return Consumer_Handler::handle_close (h, m);
}

// This method should be called only when the Consumer shuts down
// unexpectedly.  This method marks the Connection_Handler as having
// failed and deactivates the ACE_Message_Queue (to wake up the thread
// blocked on <dequeue_head> in svc()).
// Thr_Consumer_Handler::handle_close () will eventually try to
// reconnect...

// Let Consumer_Handler receive normal data.
int
Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
{
  // Call down to the <Consumer_Handler> to handle this first.
  if (this->Consumer_Handler::handle_input (h) != 0)
    {
      // Only do such work when failed.

      ACE_Reactor::instance ()->remove_handler
        (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);

      // Deactivate the queue while we try to get reconnected.
      this->msg_queue ()->deactivate ();
      // Will call handle_close.
      return -1;
    }
  return 0;
}

// Initialize the threaded Consumer_Handler object and spawn a new
// thread.

int
Thr_Consumer_Handler::open (void *)
{
  // Turn off non-blocking I/O.
  if (this->peer ().disable (ACE_NONBLOCK) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                       "disable"),
                      -1); // Incorrect info fixed.

  // Call back to the <Event_Channel> to complete our initialization.
  else if (this->event_channel_->complete_connection_connection (this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                       "complete_connection_connection"),
                      -1);

  // Register ourselves to receive input events (which indicate that
  // the Consumer has shut down unexpectedly).
  else if (ACE_Reactor::instance ()->register_handler
      (this, ACE_Event_Handler::READ_MASK) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                       "register_handler"),
                      -1);

  // Reactivate message queue.  If it was active then this is the
  // first time in and we need to spawn a thread, otherwise the queue
  // was inactive due to some problem and we've already got a thread.
  else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) spawning new thread\n"));
      // Become an active object by spawning a new thread to transmit
      // events to Consumers.
      return this->activate (THR_NEW_LWP | THR_DETACHED);
    }
  else
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) reusing existing thread\n"));
      return 0;
    }
}

// Queue up an event for transmission (must not block since
// Supplier_Handlers may be single-threaded).

int
Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
  // Perform non-blocking enqueue, i.e., if <msg_queue> is full
  // *don't* block!
  return this->msg_queue ()->enqueue_tail
    (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
}

// Transmit events to the peer.  Note the simplification resulting
// from the use of threads, compared with the Reactive solution.

int
Thr_Consumer_Handler::svc (void)
{
  for (in_thread_ = 1;;)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) Thr_Consumer_Handler's handle = %d\n",
                  this->peer ().get_handle ()));

      // Since this method runs in its own thread it is OK to block on
      // output.

      for (ACE_Message_Block *mb = 0;
           this->msg_queue ()->dequeue_head (mb) != -1;
           )
        if (this->send (mb) == -1)
          ACE_ERROR ((LM_ERROR,
                      "(%t) %p\n",
                      "send failed"));

      ACE_ASSERT (errno == ESHUTDOWN);

      ACE_DEBUG ((LM_DEBUG,
                  "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
                  this->connection_id (),
                  this->get_handle ()));

      this->peer ().close ();

      // Re-establish the connection, using exponential backoff.
      for (this->timeout (1);
           // Default is to reconnect synchronously.
           this->event_channel_->initiate_connection_connection (this, 1) == -1;
           // Second parameter '1' means using sync mode directly,
           // don't care Options::blocking_semantics().  If don't do
           // so, async mode will be used to connect which won't
           // satisfy original design.
           )
        {
          ACE_Time_Value tv (this->timeout ());

          ACE_ERROR ((LM_ERROR,
                      "(%t) reattempting connection, sec = %d\n",
                      tv.sec ()));

          ACE_OS::sleep (tv);
        }
    }

  ACE_NOTREACHED (return 0;)
}

Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci)
  : Supplier_Handler (pci)
{
  // It is not in thread svc() now.
  in_thread_ = 0;
}

// Overriding handle_close() method.  If in thread svc(), no need to
// process handle_close() when call peer().close(), because the
// connection is blocked now.

int
Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
{
  if (in_thread_)
    return 0;
  else
    return Supplier_Handler::handle_close (h, m);
}

int
Thr_Supplier_Handler::open (void *)
{
  // Turn off non-blocking I/O.
  if (this->peer ().disable (ACE_NONBLOCK) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                       "disable"),
                      -1); // Incorrect info fixed.

  // Call back to the <Event_Channel> to complete our initialization.
  else if (this->event_channel_->complete_connection_connection (this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                       "complete_connection_connection"),
                      -1);

  // Reactivate message queue.  If it was active then this is the
  // first time in and we need to spawn a thread, otherwise the queue
  // was inactive due to some problem and we've already got a thread.
  else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) spawning new thread\n"));
      // Become an active object by spawning a new thread to transmit
      // events to peers.
      return this->activate (THR_NEW_LWP | THR_DETACHED);
    }
  else
    {
      ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
      return 0;
    }
}

// Receive events from a Peer in a separate thread (note reuse of
// existing code!).

int
Thr_Supplier_Handler::svc (void)
{
  for (in_thread_ = 1;;)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) Thr_Supplier_Handler's handle = %d\n",
                 this->peer ().get_handle ()));

      // Since this method runs in its own thread and processes events
      // for one connection it is OK to call down to the
      // <Supplier_Handler::handle_input> method, which blocks on
      // input.

      while (this->Supplier_Handler::handle_input () != -1)
        continue;

      ACE_DEBUG ((LM_DEBUG,
                  "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
                  this->connection_id (),
                  this->get_handle ()));

      this->peer ().close ();

      // Deactivate the queue while we try to get reconnected.
      this->msg_queue ()->deactivate ();

      // Re-establish the connection, using expoential backoff.
      for (this->timeout (1);
           // Default is to reconnect synchronously.
           this->event_channel_->initiate_connection_connection (this, 1) == -1;
           // Second parameter '1' means using sync mode directly,
           // don't care Options::blocking_semantics().  If don't do
           // so, async mode will be used to connect which won't
           // satisfy original design.
           )
        {
          ACE_Time_Value tv (this->timeout ());
          ACE_ERROR ((LM_ERROR,
                      "(%t) reattempting connection, sec = %d\n",
                      tv.sec ()));
          ACE_OS::sleep (tv);
        }
    }
  ACE_NOTREACHED(return 0);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -