⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 concrete_connection_handlers.cpp

📁 一个开源的网络开发库ACE
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Concrete_Connection_Handlers.cpp,v 4.17 2002/03/04 20:36:21 schmidt Exp

#define ACE_BUILD_SVC_DLL

#include "Event_Channel.h"
#include "Concrete_Connection_Handlers.h"

ACE_RCSID(Gateway, Concrete_Connection_Handlers, "Concrete_Connection_Handlers.cpp,v 4.17 2002/03/04 20:36:21 schmidt Exp")

Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci)
  : Connection_Handler (pci)
{
  this->connection_role_ = 'C';
  this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
}

// This method should be called only when the Consumer shuts down
// unexpectedly.  This method simply marks the Connection_Handler as
// having failed so that handle_close () can reconnect.

// Do not close handler when received data successfully.
// Consumer_Handler should could process received data.
// For example, Consumer could send reply-event to Supplier.
int
Consumer_Handler::handle_input (ACE_HANDLE)
{
  // Do not set FAILED state at here, just at real failed place.

  char buf[BUFSIZ];
  ssize_t received = this->peer ().recv (buf, sizeof buf);

  switch (received)
    {
    case -1:
      this->state (Connection_Handler::FAILED);
      ACE_ERROR_RETURN ((LM_ERROR,
                        "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n",
                        this->connection_id ()),
                        -1);
      /* NOTREACHED */
    case 0:
      this->state (Connection_Handler::FAILED);
      ACE_ERROR_RETURN ((LM_ERROR,
                        "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n",
                        this->connection_id ()),
                        -1);
      /* NOTREACHED */
    default:
      ACE_ERROR_RETURN ((LM_ERROR,
                        "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n"
                        "data size = %d\n",
                        this->connection_id (),
                        received),
                        0); // Return 0 to identify received data successfully.
      /* NOTREACHED */
    }
}

// Perform a non-blocking put() of event.  If we are unable to send
// the entire event the remainder is re-queued at the *front* of the
// Event_List.

int
Consumer_Handler::nonblk_put (ACE_Message_Block *event)
{
  // Try to send the event.  If we don't send it all (e.g., due to
  // flow control), then re-queue the remainder at the head of the
  // Event_List and ask the ACE_Reactor to inform us (via
  // handle_output()) when it is possible to try again.

  ssize_t n = this->send (event);

  if (n == -1)
    {
      // -1 is returned only when things have really gone wrong (i.e.,
      // not when flow control occurs).  Thus, let's try to close down
      // and set up a new reconnection by calling handle_close().
      this->state (Connection_Handler::FAILED);
      this->handle_close ();
      return -1;
    }
  else if (errno == EWOULDBLOCK) 
    {
      // We didn't manage to send everything, so we need to queue
      // things up.

      ACE_DEBUG ((LM_DEBUG,
                  "(%t) queueing activated on handle %d to routing id %d\n",
                  this->get_handle (),
                  this->connection_id ()));

      // ACE_Queue in *front* of the list to preserve order.
      if (this->msg_queue ()->enqueue_head
          (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "enqueue_head"),
                          -1);

      // Tell ACE_Reactor to call us back when we can send again.
      else if (ACE_Reactor::instance ()->schedule_wakeup
               (this, ACE_Event_Handler::WRITE_MASK) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "schedule_wakeup"),
                          -1);
      return 0;
    }
  else 
    return n;
}

ssize_t
Consumer_Handler::send (ACE_Message_Block *event)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) sending %d bytes to Consumer %d\n",
              event->length (),
              this->connection_id ()));

  ssize_t len = event->length ();
  ssize_t n = this->peer ().send (event->rd_ptr (), len);

  if (n <= 0)
    return errno == EWOULDBLOCK ? 0 : n;
  else if (n < len)
    {
      // Re-adjust pointer to skip over the part we did send.
      event->rd_ptr (n);
      errno = EWOULDBLOCK;
    }
  else // if (n == length)
    {
      // The whole event is sent, we now decrement the reference count
      // (which deletes itself with it reaches 0).
      event->release ();
      errno = 0;
    }
  this->total_bytes (n);
  return n;
}

// Finish sending an event when flow control conditions abate.
// This method is automatically called by the ACE_Reactor.

int
Consumer_Handler::handle_output (ACE_HANDLE)
{
  ACE_Message_Block *event = 0;

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"),
              this->get_handle ()));

  // WIN32 Notes: When the receiver blocked, we started adding to the
  // consumer handler's message Q. At this time, we registered a
  // callback with the reactor to tell us when the TCP layer signalled
  // that we could continue to send messages to the consumer. However,
  // Winsock only sends this notification ONCE, so we have to assume
  // at the application level, that we can continue to send until we
  // get any subsequent blocking signals from the receiver's buffer.

#if defined (ACE_WIN32)
  // Win32 Winsock doesn't trigger multiple "You can write now"
  // signals, so we have to assume that we can continue to write until
  // we get another EWOULDBLOCK.

  // We cancel the wakeup callback we set earlier.
  if (ACE_Reactor::instance ()->cancel_wakeup
      (this, ACE_Event_Handler::WRITE_MASK) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("(%t) %p\n"),
                       ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")),
                      -1);

  // The list had better not be empty, otherwise there's a bug!
  while (this->msg_queue ()->dequeue_head
         (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
    {
      switch (this->nonblk_put (event))
        {
        case -1:                // Error sending message to consumer.
          {
            // We are responsible for releasing an ACE_Message_Block if
            // failures occur.
            event->release ();

            ACE_ERROR ((LM_ERROR,
                        ACE_TEXT ("(%t) %p\n"),
                        ACE_TEXT ("transmission failure")));
            break;
          }
        case 0:                 // Partial Send - we got flow controlled by the receiver
          {
            ACE_ASSERT (errno == EWOULDBLOCK);
            ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT ("%D Partial Send due to flow control")
                        ACE_TEXT ("- scheduling new wakeup with reactor\n")));

            // Re-schedule a wakeup call from the reactor when the
            // flow control conditions abate.
            if (ACE_Reactor::instance ()->schedule_wakeup
                (this,
                 ACE_Event_Handler::WRITE_MASK) == -1)
              ACE_ERROR_RETURN ((LM_ERROR,
                                 ACE_TEXT ("(%t) %p\n"),
                                 ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")),
                                -1);

            // Didn't write everything this time, come back later...
            return 0;
          }
        default:                // Sent the whole thing
          {
            ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT ("Sent message from message Q, Q size = %d\n"),
                        this->msg_queue()->message_count ()));
            break;
          }
        }
    }

  // If we drop out of the while loop, then the message Q should be
  // empty...or there's a problem in the dequeue_head() call...but
  // thats another story.
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("%D Sent all messages from consumers message Q\n")));

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
              this->get_handle (),
              this->connection_id ()));
#else /* !defined (ACE_WIN32) */
  // The list had better not be empty, otherwise there's a bug!
  if (this->msg_queue ()->dequeue_head
      (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
    {
      switch (this->nonblk_put (event))
        {
        case 0:           // Partial send.
          ACE_ASSERT (errno == EWOULDBLOCK);
          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("%D Partial Send\n")));

          // Didn't write everything this time, come back later...
          break;

        case -1:
          // We are responsible for releasing an ACE_Message_Block if
          // failures occur.
          event->release ();
          ACE_ERROR ((LM_ERROR,
                      ACE_TEXT ("(%t) %p\n"),
                      ACE_TEXT ("transmission failure")));

          /* FALLTHROUGH */
        default: // Sent the whole thing.

          // If we succeed in writing the entire event (or we did not
          // fail due to EWOULDBLOCK) then check if there are more
          // events on the Message_Queue.  If there aren't, tell the
          // ACE_Reactor not to notify us anymore (at least until
          // there are new events queued up).

          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("QQQ::Sent Message from consumer's Q\n")));

          if (this->msg_queue ()->is_empty ())
            {
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
                          this->get_handle (),
                          this->connection_id ()));

              if (ACE_Reactor::instance ()->cancel_wakeup
                  (this, ACE_Event_Handler::WRITE_MASK) == -1)
                ACE_ERROR ((LM_ERROR,
                            ACE_TEXT ("(%t) %p\n"),
                            ACE_TEXT ("cancel_wakeup")));
            }
        }
    }
  else
    ACE_ERROR ((LM_ERROR,
                ACE_TEXT ("(%t) %p\n"),
                ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q")));
#endif /* ACE_WIN32 */
  return 0;
}

// Send an event to a Consumer (may queue if necessary).

int
Consumer_Handler::put (ACE_Message_Block *event,
                       ACE_Time_Value *)
{
  if (this->msg_queue ()->is_empty ())
    // Try to send the event *without* blocking!
    return this->nonblk_put (event);
  else
    // If we have queued up events due to flow control then just
    // enqueue and return.
    return this->msg_queue ()->enqueue_tail
      (event, (ACE_Time_Value *) &ACE_Time_Value::zero);
}

Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci)
  : Connection_Handler (pci),
    msg_frag_ (0)
{
  this->connection_role_ = 'S';
  this->msg_queue ()->high_water_mark (0);
}

// Receive an Event from a Supplier.  Handles fragmentation.
//
// The event returned from recv consists of two parts:
//
// 1. The Address part, contains the "virtual" routing id.
//
// 2. The Data part, which contains the actual data to be forwarded.
//
// The reason for having two parts is to shield the higher layers
// of software from knowledge of the event structure.

int
Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
{
  if (this->msg_frag_ == 0)
    // No existing fragment...
    ACE_NEW_RETURN (this->msg_frag_,
                    ACE_Message_Block (sizeof (Event),
                                       ACE_Message_Block::MB_DATA,
                                       0,
                                       0,
                                       0,
                                       Options::instance ()->locking_strategy ()),
                    -1);

  Event *event = (Event *) this->msg_frag_->rd_ptr ();
  ssize_t header_received = 0;

  const size_t HEADER_SIZE = sizeof (Event_Header);
  ssize_t header_bytes_left_to_read =
    HEADER_SIZE - this->msg_frag_->length ();

  if (header_bytes_left_to_read > 0)
    {
      header_received = this->peer ().recv
        (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);

      if (header_received == -1 /* error */
          || header_received == 0  /* EOF */)
        {
          ACE_ERROR ((LM_ERROR, "%p\n",
                      "Recv error during header read "));
          ACE_DEBUG ((LM_DEBUG,
                      "attempted to read %d\n",
                      header_bytes_left_to_read));
          this->msg_frag_ = this->msg_frag_->release ();
          return header_received;
        }

      // Bump the write pointer by the amount read.
      this->msg_frag_->wr_ptr (header_received);

      // At this point we may or may not have the ENTIRE header.
      if (this->msg_frag_->length () < HEADER_SIZE)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "Partial header received: only %d bytes\n",
                     this->msg_frag_->length ()));
          // Notify the caller that we didn't get an entire event.
          errno = EWOULDBLOCK;
          return -1;
        }

      // Convert the header into host byte order so that we can access
      // it directly without having to repeatedly muck with it...
      event->header_.decode ();

      if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
        {
          // This data_ payload is too big!
          errno = EINVAL;
          ACE_DEBUG ((LM_DEBUG,
                      "Data payload is too big (%d bytes)\n",
                      event->header_.len_));
          return -1;
        }

    }

  // At this point there is a complete, valid header in Event.  Now we
  // need to get the event payload.  Due to incomplete reads this may
  // not be the first time we've read in a fragment for this message.
  // We account for this here.  Note that the first time in here
  // msg_frag_->wr_ptr() will point to event->data_.  Every time we do
  // a successful fragment read, we advance wr_ptr().  Therefore, by
  // subtracting how much we've already read from the
  // event->header_.len_ we complete the data_bytes_left_to_read...

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -