⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uipmc_transport.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// This may look like C, but it's really -*- C++ -*-
// UIPMC_Transport.cpp,v 1.23 2004/01/09 11:59:08 jwillemsen Exp

#include "UIPMC_Transport.h"

#include "UIPMC_Connection_Handler.h"
#include "UIPMC_Message_Block_Data_Iterator.h"
#include "UIPMC_Acceptor.h"
#include "UIPMC_Profile.h"
#include "UIPMC_Wait_Never.h"
#include "tao/Acceptor_Registry.h"
#include "tao/operation_details.h"
#include "tao/Timeprobe.h"
#include "tao/CDR.h"
#include "tao/Transport_Mux_Strategy.h"
#include "tao/Wait_Strategy.h"
#include "tao/Sync_Strategies.h"
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/Resume_Handle.h"
#include "tao/GIOP_Message_Base.h"
#include "tao/GIOP_Message_Lite.h"

#if !defined (__ACE_INLINE__)
# include "UIPMC_Transport.i"
#endif /* ! __ACE_INLINE__ */


ACE_RCSID (PortableGroup,
           UIPMC_Transport,
           "UIPMC_Transport.cpp,v 1.23 2004/01/09 11:59:08 jwillemsen Exp")


// Local MIOP Definitions:

// Note: We currently support packet fragmentation on transmit, but
//       do not support reassembly.

// Limit the number of fragments that we can divide a message
// into.
#define MIOP_MAX_FRAGMENTS    (1)
#define MIOP_MAX_HEADER_SIZE  (272) // See MIOP Spec.  Must be a multiple of 8.
#define MIOP_MAX_DGRAM_SIZE   (ACE_MAX_DGRAM_SIZE)

#define MIOP_MAGIC_OFFSET             (0)
#define MIOP_VERSION_OFFSET           (4)
#define MIOP_FLAGS_OFFSET             (5)
#define MIOP_PACKET_LENGTH_OFFSET     (6)
#define MIOP_PACKET_NUMBER_OFFSET     (8)
#define MIOP_NUMBER_OF_PACKETS_OFFSET (12)
#define MIOP_ID_LENGTH_OFFSET         (16)
#define MIOP_MIN_LENGTH_ID            (0)
#define MIOP_MAX_LENGTH_ID            (252)
#define MIOP_ID_DEFAULT_LENGTH        (12)
#define MIOP_ID_CONTENT_OFFSET        (20)
#define MIOP_HEADER_PADDING           (0)   // The ID field needs to be padded to
                                            // a multiple of 8 bytes.
#define MIOP_HEADER_SIZE              (MIOP_ID_CONTENT_OFFSET   \
                                       + MIOP_ID_DEFAULT_LENGTH \
                                       + MIOP_HEADER_PADDING)
#define MIOP_MIN_HEADER_SIZE          (MIOP_ID_CONTENT_OFFSET   \
                                       + MIOP_MIN_LENGTH_ID     \
                                       + (8 - MIOP_MIN_LENGTH_ID) /* padding */)

static const CORBA::Octet miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 }; // 'M', 'I', 'O', 'P'

struct MIOP_Packet
{
  iovec iov[ACE_IOV_MAX];
  int iovcnt;
  int length;
};

TAO_UIPMC_Transport::TAO_UIPMC_Transport (TAO_UIPMC_Connection_Handler *handler,
                                          TAO_ORB_Core *orb_core,
                                          CORBA::Boolean /*flag*/)
  : TAO_Transport (TAO_TAG_UIPMC_PROFILE,
                   orb_core)
  , connection_handler_ (handler)
  , messaging_object_ (0)
{
  // Use the normal GIOP object
  ACE_NEW (this->messaging_object_,
           TAO_GIOP_Message_Base (orb_core,
                                  MIOP_MAX_DGRAM_SIZE));

  // Replace the default wait strategy with our own
  // since we don't support waiting on anything.
  delete this->ws_;
  ACE_NEW (this->ws_,
           TAO_UIPMC_Wait_Never (this));
}

TAO_UIPMC_Transport::~TAO_UIPMC_Transport (void)
{
  delete this->messaging_object_;
}

ACE_Event_Handler *
TAO_UIPMC_Transport::event_handler_i (void)
{
  return this->connection_handler_;
}

TAO_Connection_Handler *
TAO_UIPMC_Transport::connection_handler_i (void)
{
  return this->connection_handler_;
}

TAO_Pluggable_Messaging *
TAO_UIPMC_Transport::messaging_object (void)
{
  return this->messaging_object_;
}


void
TAO_UIPMC_Transport::write_unique_id (TAO_OutputCDR &miop_hdr, unsigned long unique)
{
  // We currently construct a unique ID for each MIOP message by
  // concatenating the address of the buffer to a counter.  We may
  // also need to use a MAC address or something more unique to
  // fully comply with the MIOP specification.

  static unsigned long counter = 1;  // Don't worry about race conditions on counter,
                                     // since buffer addresses can't be the same if
                                     // this is being called simultaneously.

  CORBA::Octet unique_id[MIOP_ID_DEFAULT_LENGTH];

  unique_id[0] = ACE_static_cast (CORBA::Octet, unique & 0xff);
  unique_id[1] = ACE_static_cast (CORBA::Octet, (unique & 0xff00) >> 8);
  unique_id[2] = ACE_static_cast (CORBA::Octet, (unique & 0xff0000) >> 16);
  unique_id[3] = ACE_static_cast (CORBA::Octet, (unique & 0xff000000) >> 24);

  unique_id[4] = ACE_static_cast (CORBA::Octet, counter & 0xff);
  unique_id[5] = ACE_static_cast (CORBA::Octet, (counter & 0xff00) >> 8);
  unique_id[6] = ACE_static_cast (CORBA::Octet, (counter & 0xff0000) >> 16);
  unique_id[7] = ACE_static_cast (CORBA::Octet, (counter & 0xff000000) >> 24);

  unique_id[8] = 0;
  unique_id[9] = 0;
  unique_id[10] = 0;
  unique_id[11] = 0;

  miop_hdr.write_ulong (MIOP_ID_DEFAULT_LENGTH);
  miop_hdr.write_octet_array (unique_id, MIOP_ID_DEFAULT_LENGTH);
}

ssize_t
TAO_UIPMC_Transport::send (iovec *iov, int iovcnt,
                           size_t &bytes_transferred,
                           const ACE_Time_Value *)
{
  const ACE_INET_Addr &addr = this->connection_handler_->addr ();
  bytes_transferred = 0;

  // Calculate the bytes to send.  This value is only used for
  // error conditions to fake a good return.  We do this for
  // semantic consistency with DIOP, and since errors aren't
  // handled correctly from send_i (our fault).  If these
  // semantics are not desirable, the error handling problems
  // that need to be fixed can be found in
  // UIPMC_Connection_Handler::decr_refcount which will need to
  // deregister the connection handler from the UIPMC_Connector
  // cache.
  ssize_t bytes_to_send = 0;
  for (int i = 0; i < iovcnt; i++)
     bytes_to_send += iov[i].iov_len;

  MIOP_Packet fragments[MIOP_MAX_FRAGMENTS];
  MIOP_Packet *current_fragment;
  int num_fragments = 1;

  UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt);

  // Initialize the first fragment
  current_fragment = &fragments[0];
  current_fragment->iovcnt = 1;  // The MIOP Header
  current_fragment->length = MIOP_HEADER_SIZE;

  // Go through all of the message blocks.
  while (mb_iter.next_block (MIOP_MAX_DGRAM_SIZE - current_fragment->length,
                             current_fragment->iov[current_fragment->iovcnt]))
    {
      // Increment the length and iovcnt.
      current_fragment->length += current_fragment->iov[current_fragment->iovcnt].iov_len;
      current_fragment->iovcnt++;

      // Check if we've filled up this fragment or if we've run out of
      // iov entries.
      if (current_fragment->length == MIOP_MAX_DGRAM_SIZE ||
          current_fragment->iovcnt == ACE_IOV_MAX)
        {
          // Make a new fragment.
          num_fragments++;

          // Check if too many fragments
          if (num_fragments > MIOP_MAX_FRAGMENTS)
            {
              // This is an error as we do not send more.
              // Silently drop the message but log an error.

              // Pluggable_Messaging::transport_message only
              // cares if it gets -1 or 0 so we can return a
              // partial length and it will think all has gone
              // well.
              if (TAO_debug_level > 0)
                {
                  ACE_DEBUG ((LM_DEBUG,
                              ACE_TEXT ("\n\nTAO (%P|%t) ")
                              ACE_TEXT ("UIPMC_Transport::send_i ")
                              ACE_TEXT ("Message of size %d needs too many MIOP fragments (max is %d).\n")
                              ACE_TEXT ("You may be able to increase ACE_MAX_DGRAM_SIZE.\n"),
                              bytes_to_send,
                              MIOP_MAX_FRAGMENTS));
                }

              // Pretend it is o.k.  See note by bytes_to_send calculation.
              bytes_transferred = bytes_to_send;
              return 1;
            }

          // Otherwise, initialize another fragment.
          current_fragment++;
          current_fragment->iovcnt = 1;  // The MIOP Header
          current_fragment->length = MIOP_HEADER_SIZE;
        }
    }

  // Build a generic MIOP Header.

  // Allocate space on the stack for the header (add 8 to account for
  // the possibility of adjusting for alignment).
  char header_buffer[MIOP_HEADER_SIZE + 8];
  TAO_OutputCDR miop_hdr (header_buffer, MIOP_HEADER_SIZE + 8);

  miop_hdr.write_octet_array (miop_magic, 4);   // Magic
  miop_hdr.write_octet (0x10);                  // Version
  CORBA::Octet *flags_field = ACE_reinterpret_cast (CORBA::Octet *,
                                                    miop_hdr.current ()->wr_ptr ());

  // Write flags octet:
  //  Bit        Description
  //   0         Endian
  //   1         Stop message flag (Assigned later)
  //   2 - 7     Set to 0
  miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER);  // Flags

  // Packet Length
  // NOTE: We can save pointers and write them later without byte swapping since
  //       in CORBA, the sender chooses the endian.
  CORBA::UShort *packet_length = ACE_reinterpret_cast (CORBA::UShort *,
                                                       miop_hdr.current ()->wr_ptr ());
  miop_hdr.write_short (0);

  // Packet number
  CORBA::ULong *packet_number = ACE_reinterpret_cast (CORBA::ULong *,
                                                      miop_hdr.current ()->wr_ptr ());
  miop_hdr.write_ulong (0);

  // Number of packets field
  miop_hdr.write_ulong (num_fragments);

  // UniqueId
  ptrdiff_t unique_id = ACE_reinterpret_cast (ptrdiff_t, iov);
  this->write_unique_id (miop_hdr,
                         ACE_static_cast (unsigned long, unique_id));

  // Send the buffers.
  current_fragment = &fragments[0];
  while (num_fragments > 0 &&
         current_fragment->iovcnt > 1)
    {
      // Fill in the packet length header field.
      *packet_length = current_fragment->length;

      // If this is the last fragment, set the stop message flag.
      if (num_fragments == 1)
        {
          *flags_field |= 0x02;
        }

      // Setup the MIOP header in the iov list.
      current_fragment->iov[0].iov_base = miop_hdr.current ()->rd_ptr ();
      current_fragment->iov[0].iov_len = MIOP_HEADER_SIZE;

      // Send the fragment. - Need to check for errors!!
      ssize_t rc = this->connection_handler_->dgram ().send (current_fragment->iov,
                                                             current_fragment->iovcnt,
                                                             addr);

      if (rc <= 0)
        {
          if (TAO_debug_level > 0)
            {
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("\n\nTAO (%P|%t) ")
                          ACE_TEXT ("UIPMC_Transport::send")
                          ACE_TEXT (" %p\n\n"),
                          ACE_TEXT ("Error returned from transport:")));
            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -