⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ecg_cdr_message_sender.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// ECG_CDR_Message_Sender.cpp,v 1.3 2003/06/24 03:32:17 bala Exp

#include "ECG_CDR_Message_Sender.h"
#include "tao/CDR.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/ACE.h"

#if !defined(__ACE_INLINE__)
#include "ECG_CDR_Message_Sender.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID(Event, ECG_CDR_Message_Sender, "ECG_CDR_Message_Sender.cpp,v 1.3 2003/06/24 03:32:17 bala Exp")

void
TAO_ECG_CDR_Message_Sender::init (
      TAO_ECG_Refcounted_Endpoint endpoint_rptr
      ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  if (endpoint_rptr.get () == 0
      || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
    {
      ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
                            "nil or unitialized endpoint argument."));
      ACE_THROW (CORBA::INTERNAL ());
    }

  this->endpoint_rptr_ = endpoint_rptr;
}

void
TAO_ECG_CDR_Message_Sender::send_message  (const TAO_OutputCDR &cdr,
                                           const ACE_INET_Addr &addr
                                           ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  if (this->endpoint_rptr_.get () == 0)
    {
      ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
                            "on non-initialized sender object."));
      ACE_THROW (CORBA::INTERNAL ());
    }

  CORBA::ULong max_fragment_payload = this->mtu () -
    TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
  // ACE_ASSERT (max_fragment_payload != 0);

#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
  const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
#else
  const int TAO_WRITEV_MAX = ACE_IOV_MAX;
#endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
  iovec iov[TAO_WRITEV_MAX];

  CORBA::ULong total_length;
  CORBA::ULong fragment_count =
    this->compute_fragment_count (cdr.begin (),
                                  cdr.end (),
                                  TAO_WRITEV_MAX,
                                  max_fragment_payload,
                                  total_length);

  CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();

  // Reserve the first iovec for the header...
  int iovcnt = 1;
  CORBA::ULong fragment_id = 0;
  CORBA::ULong fragment_offset = 0;
  CORBA::ULong fragment_size = 0;
  for (const ACE_Message_Block* b = cdr.begin ();
       b != cdr.end ();
       b = b->cont ())
    {
      CORBA::ULong l = b->length ();

      char* rd_ptr = b->rd_ptr ();

      iov[iovcnt].iov_base = rd_ptr;
      iov[iovcnt].iov_len  = l;
      fragment_size += l;
      iovcnt++;
      while (fragment_size > max_fragment_payload)
        {
          // This fragment is full, we have to send it...

          // First adjust the last iov entry:
          CORBA::ULong last_mb_length =
            max_fragment_payload - (fragment_size - l);
          iov[iovcnt - 1].iov_len = last_mb_length;

          this->send_fragment (addr,
                               request_id,
                               total_length,
                               max_fragment_payload,
                               fragment_offset,
                               fragment_id,
                               fragment_count,
                               iov,
                               iovcnt
                               ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
          fragment_id++;
          fragment_offset += max_fragment_payload;

          // Reset, but don't forget that the last Message_Block
              // may need to be sent in multiple fragments..
          l -= last_mb_length;
          rd_ptr += last_mb_length;
          iov[1].iov_base = rd_ptr;
          iov[1].iov_len = l;
          fragment_size = l;
          iovcnt = 2;
        }
      if (fragment_size == max_fragment_payload)
        {
          // We filled a fragment, but this time it was filled
          // exactly, the treatment is a little different from the
          // loop above...
          this->send_fragment (addr,
                               request_id,
                               total_length,
                               max_fragment_payload,
                               fragment_offset,
                               fragment_id,
                               fragment_count,
                               iov,
                               iovcnt
                               ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
          fragment_id++;
          fragment_offset += max_fragment_payload;

          iovcnt = 1;
          fragment_size = 0;
        }
      if (iovcnt == TAO_WRITEV_MAX)
        {
          // Now we ran out of space in the iovec, we must send a
          // fragment to work around that....
          this->send_fragment (addr,
                               request_id,
                               total_length,
                               fragment_size,
                               fragment_offset,
                               fragment_id,
                               fragment_count,
                               iov,
                               iovcnt
                               ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
          fragment_id++;
          fragment_offset += fragment_size;

          iovcnt = 1;
          fragment_size = 0;
        }
    }
  // There is something left in the iovvec that we must send
  // also...
  if (iovcnt != 1)
    {
      // Now we ran out of space in the iovec, we must send a
      // fragment to work around that....
      this->send_fragment (addr,
                           request_id,
                           total_length,
                           fragment_size,
                           fragment_offset,
                           fragment_id,
                           fragment_count,
                           iov,
                           iovcnt
                           ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
      fragment_id++;
      fragment_offset += fragment_size;

      // reset, not needed here...
      // iovcnt = 1;
      // fragment_size = 0;
    }
  // ACE_ASSERT (total_length == fragment_offset);
  // ACE_ASSERT (fragment_id == fragment_count);

}


void
TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
                                           CORBA::ULong request_id,
                                           CORBA::ULong request_size,
                                           CORBA::ULong fragment_size,
                                           CORBA::ULong fragment_offset,
                                           CORBA::ULong fragment_id,
                                           CORBA::ULong fragment_count,
                                           iovec iov[],
                                           int iovcnt
                                           ACE_ENV_ARG_DECL)
{
  CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
                     / sizeof(CORBA::ULong)
                     + ACE_CDR::MAX_ALIGNMENT];
  char* buf = ACE_reinterpret_cast(char*,header);
  TAO_OutputCDR cdr (buf, sizeof(header));
  cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
  // Insert some known values in the padding bytes, so we can smoke
  // test the message on the receiving end.
  cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
  cdr.write_ulong (request_id);
  cdr.write_ulong (request_size);
  cdr.write_ulong (fragment_size);
  cdr.write_ulong (fragment_offset);
  cdr.write_ulong (fragment_id);
  cdr.write_ulong (fragment_count);
  CORBA::Octet padding[4];


  // MRH
  if (checksum_)
  {
    // Compute CRC
     iov[0].iov_base = cdr.begin ()->rd_ptr ();
     iov[0].iov_len  = cdr.begin ()->length ();
     unsigned int crc = 0;
     unsigned char *crc_parts = (unsigned char *)(&crc);
     if (iovcnt > 1)
       {
         crc = ACE::crc32 (iov, iovcnt);
         crc = htonl (crc);
       }
     for (int cnt=0; cnt<4; ++cnt)
       {
         padding[cnt] = crc_parts[cnt];
       }
   }
   else
   {
     for (int cnt=0; cnt<4; ++cnt)
     {
       padding[cnt] = 0;
     }
   }
   //End MRH
  cdr.write_octet_array (padding, 4);

  iov[0].iov_base = cdr.begin ()->rd_ptr ();
  iov[0].iov_len  = cdr.begin ()->length ();

  ssize_t n = this->dgram ().send (iov,
                                   iovcnt,
                                   addr);
  size_t expected_n = 0;
  for (int i = 0; i < iovcnt; ++i)
    expected_n += iov[i].iov_len;
  if (n > 0 && size_t(n) != expected_n)
    {
      ACE_DEBUG ((LM_ERROR, ("Sent only %d out of %d bytes "
                              "for mcast fragment."),
                  n,
                  expected_n));
    }

  if (n == -1)
    {
      if (errno == EWOULDBLOCK)
        {
          ACE_DEBUG ((LM_ERROR, "Send of mcast fragment failed (%m)."));
          // @@ TODO Use a Event Channel specific exception
          ACE_THROW (CORBA::COMM_FAILURE ());
        }
      else
        {
          ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m)."));
        }
    }
  else if (n == 0)
    {
      ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m)."));
    }
}


CORBA::ULong
TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
                                                    const ACE_Message_Block* end,
                                                    int iov_size,
                                                    CORBA::ULong max_fragment_payload,
                                                    CORBA::ULong& total_length)
{
  CORBA::ULong fragment_count = 0;
  total_length = 0;

  CORBA::ULong fragment_size = 0;
  // Reserve the first iovec for the header...
  int iovcnt = 1;
  for (const ACE_Message_Block* b = begin;
       b != end;
       b = b->cont ())
    {
      CORBA::ULong l = b->length ();
      total_length += l;
      fragment_size += l;
      iovcnt++;
      while (fragment_size > max_fragment_payload)
        {
          // Ran out of space, must create a fragment...
          fragment_count++;

          // The next iovector will contain what remains of this
          // buffer, but also consider
          iovcnt = 2;
          l -= max_fragment_payload - (fragment_size - l);
          fragment_size = l;
        }
      if (fragment_size == max_fragment_payload)
        {
          fragment_count++;
          iovcnt = 1;
          fragment_size = 0;
        }
      if (iovcnt >= iov_size)
        {
          // Ran out of space in the iovector....
          fragment_count++;
          iovcnt = 1;
          fragment_size = 0;
        }
    }
  if (iovcnt != 1)
    {
      // Send the remaining data in another fragment
      fragment_count++;
    }
  return fragment_count;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -