⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uipmc_transport.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:

            // Pretend it is o.k.  See note by bytes_to_send calculation.
            bytes_transferred = bytes_to_send;
            return 1;
        }

      // Increment the number of bytes transferred, but don't
      // count the MIOP header that we added.
      bytes_transferred += rc - MIOP_HEADER_SIZE;

      if (TAO_debug_level > 0)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "TAO_UIPMC_Transport::send_i: sent %d bytes to %s:%d\n",
                      rc,
                      addr.get_host_addr (),
                      addr.get_port_number ()));
        }

      // Go to the next fragment.
      (*packet_number)++;
      ++current_fragment;
      --num_fragments;
    }

  // Return total bytes transferred.
  return bytes_transferred;
}


ssize_t
TAO_UIPMC_Transport::recv (char *buf,
                           size_t len,
                           const ACE_Time_Value * /*max_wait_time*/)
{
  ACE_INET_Addr from_addr;

  ssize_t n = this->connection_handler_->mcast_dgram ().recv (buf,
                                                              len,
                                                              from_addr);
  if (TAO_debug_level > 5)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "TAO_UIPMC_Transport::recv_i: received %d bytes from %s:%d\n",
                  n,
                  from_addr.get_host_addr (),
                  from_addr.get_port_number ()));
    }

  // Make sure that we at least have a MIOP header.
  if (n < MIOP_MIN_HEADER_SIZE)
    {
      if (TAO_debug_level > 0)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "TAO_UIPMC_Transport::recv_i: packet of size %d is too small from %s:%d\n",
                      n,
                      from_addr.get_host_addr (),
                      from_addr.get_port_number ()));
        }
      return 0;
    }

  // Check for MIOP magic bytes.
  if (buf[MIOP_MAGIC_OFFSET] != miop_magic [0] ||
      buf[MIOP_MAGIC_OFFSET + 1] != miop_magic [1] ||
      buf[MIOP_MAGIC_OFFSET + 2] != miop_magic [2] ||
      buf[MIOP_MAGIC_OFFSET + 3] != miop_magic [3])
    {
      if (TAO_debug_level > 0)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "TAO_UIPMC_Transport::recv_i: UIPMC packet didn't contain magic bytes.\n"));
        }

      return 0;
    }

  // Retrieve the byte order.
  // 0 = Big endian
  // 1 = Small endian
  CORBA::Octet byte_order = buf[MIOP_FLAGS_OFFSET] & 0x01;

  // Ignore the header version, other flags, packet length and number of packets.

  // Get the length of the ID.
  CORBA::ULong id_length;
#if !defined (ACE_DISABLE_SWAP_ON_READ)
  if (byte_order == ACE_CDR_BYTE_ORDER)
    {
      id_length = *ACE_reinterpret_cast (ACE_CDR::ULong*, &buf[MIOP_ID_LENGTH_OFFSET]);
    }
  else
    {
      ACE_CDR::swap_4 (&buf[MIOP_ID_LENGTH_OFFSET],
                       ACE_reinterpret_cast (char*, &id_length));
    }
#else
  id_length = *ACE_reinterpret_cast (ACE_CDR::ULong*, &buf[MIOP_ID_LENGTH_OFFSET]);
#endif /* ACE_DISABLE_SWAP_ON_READ */

  // Make sure that the length field is legal.
  if (id_length > MIOP_MAX_LENGTH_ID ||
      ACE_static_cast (ssize_t, MIOP_ID_CONTENT_OFFSET + id_length) > n)
    {
      if (TAO_debug_level > 0)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "TAO_UIPMC_Transport::recv_i: Invalid ID length.\n"));
        }

      return 0;
    }

  // Trim off the header for now.
  ssize_t miop_header_size = (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7;
  if (miop_header_size > n)
    {
      if (TAO_debug_level > 0)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "TAO_UIPMC_Transport::recv_i: MIOP packet not large enough for padding.\n"));
        }

      return 0;
    }

  n -= miop_header_size;
  ACE_OS::memmove (buf, buf + miop_header_size, n);

  return n;
}

int
TAO_UIPMC_Transport::handle_input (TAO_Resume_Handle &rh,
                                   ACE_Time_Value *max_wait_time,
                                   int /*block*/)
{
  // If there are no messages then we can go ahead to read from the
  // handle for further reading..

  // The buffer on the stack which will be used to hold the input
  // messages
  char buf [MIOP_MAX_DGRAM_SIZE];

#if defined (ACE_HAS_PURIFY)
  (void) ACE_OS::memset (buf,
                         '\0',
                         sizeof buf);
#endif /* ACE_HAS_PURIFY */

  // Create a data block
  ACE_Data_Block db (sizeof (buf),
                     ACE_Message_Block::MB_DATA,
                     buf,
                     this->orb_core_->input_cdr_buffer_allocator (),
                     this->orb_core_->locking_strategy (),
                     ACE_Message_Block::DONT_DELETE,
                     this->orb_core_->input_cdr_dblock_allocator ());

  // Create a message block
  ACE_Message_Block message_block (&db,
                                   ACE_Message_Block::DONT_DELETE,
                                   this->orb_core_->input_cdr_msgblock_allocator ());


  // Align the message block
  ACE_CDR::mb_align (&message_block);


  // Read the message into the  message block that we have created on
  // the stack.
  ssize_t n = this->recv (message_block.rd_ptr (),
                          message_block.space (),
                          max_wait_time);

  // If there is an error return to the reactor..
  if (n <= 0)
    {
      if (TAO_debug_level)
        {
          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"),
                      this->id (),
                      ACE_TEXT ("handle_input_i ()\n")));
        }

      if (n == -1)
        this->tms_->connection_closed ();

      return n;
    }

  // Set the write pointer in the stack buffer.
  message_block.wr_ptr (n);

  // Parse the incoming message for validity. The check needs to be
  // performed by the messaging objects.
  if (this->parse_incoming_messages (message_block) == -1)
    {
      if (TAO_debug_level)
        {
          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("TAO: (%P|%t|%N|%l) parse_incoming_messages failed on transport %d after fault %p\n"),
                      this->id (),
                      ACE_TEXT ("handle_input_i ()\n")));
        }

      return -1;
    }

  // NOTE: We are not performing any queueing nor any checking for
  // missing data. We are assuming that ALL the data would be got in a
  // single read.

  // Make a node of the message block..
  TAO_Queued_Data qd (&message_block);

  // Extract the data for the node..
  this->messaging_object ()->get_message_data (&qd);

  // Process the message
  return this->process_parsed_messages (&qd, rh);
}

int
TAO_UIPMC_Transport::register_handler (void)
{
  // We never register register the handler with the reactor
  // as we never need to be informed about any incoming data,
  // assuming we only use one-ways.
  // If we would register and ICMP Messages would arrive, e.g
  // due to a not reachable server, we would get informed - as this
  // disturbs the general MIOP assumptions of not being
  // interested in any network failures, we ignore ICMP messages.
  return 0;
}

int
TAO_UIPMC_Transport::send_request (TAO_Stub *stub,
                                   TAO_ORB_Core *orb_core,
                                   TAO_OutputCDR &stream,
                                   int message_semantics,
                                   ACE_Time_Value *max_wait_time)
{
  if (this->ws_->sending_request (orb_core,
                                  message_semantics) == -1)
    return -1;

  if (this->send_message (stream,
                          stub,
                          message_semantics,
                          max_wait_time) == -1)

    return -1;

  return 0;
}

int
TAO_UIPMC_Transport::send_message (TAO_OutputCDR &stream,
                                   TAO_Stub *stub,
                                   int message_semantics,
                                   ACE_Time_Value *max_wait_time)
{
  // Format the message in the stream first
  if (this->messaging_object_->format_message (stream) != 0)
    return -1;

  // Strictly speaking, should not need to loop here because the
  // socket never gets set to a nonblocking mode ... some Linux
  // versions seem to need it though.  Leaving it costs little.

  // This guarantees to send all data (bytes) or return an error.
  ssize_t n = this->send_message_shared (stub,
                                         message_semantics,
                                         stream.begin (),
                                         max_wait_time);

  if (n == -1)
    {
      if (TAO_debug_level)
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
                    this->id (),
                    ACE_TEXT ("send_message ()\n")));

      return -1;
    }

  return 1;
}



int
TAO_UIPMC_Transport::messaging_init (CORBA::Octet major,
                                    CORBA::Octet minor)
{
  this->messaging_object_->init (major,
                                 minor);
  return 1;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -