⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ecg_cdr_message_receiver.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// ECG_CDR_Message_Receiver.cpp,v 1.8 2003/11/24 05:21:59 bala Exp

#include "ECG_CDR_Message_Receiver.h"
#include "ECG_CDR_Message_Sender.h"

#include "tao/Exception.h"

#include "ace/SOCK_Dgram.h"
#include "ace/ACE.h"
#include "ace/OS_NS_string.h"

#if !defined(__ACE_INLINE__)
#include "ECG_CDR_Message_Receiver.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID (Event, 
           ECG_CDR_Message_Receiver, 
           "ECG_CDR_Message_Receiver.cpp,v 1.8 2003/11/24 05:21:59 bala Exp")


TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void)
{
}
// ****************************************************************

TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void)
{
  if (this->own_received_fragments_)
    {
      this->own_received_fragments_ = 0;
      delete[] this->received_fragments_;
    }
}

TAO_ECG_UDP_Request_Entry::
TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
                           CORBA::ULong request_id,
                           CORBA::ULong request_size,
                           CORBA::ULong fragment_count)
  : byte_order_ (byte_order)
  , request_id_ (request_id)
  , request_size_ (request_size)
  , fragment_count_ (fragment_count)
{
  ACE_CDR::grow (&this->payload_, this->request_size_);
  this->payload_.wr_ptr (request_size_);

  this->received_fragments_ = this->default_received_fragments_;
  this->own_received_fragments_ = 0;
  const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
  this->received_fragments_size_ =
    this->fragment_count_ / bits_per_ulong + 1;
  if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ)
    {
      ACE_NEW (this->received_fragments_,
               CORBA::ULong[this->received_fragments_size_]);
      this->own_received_fragments_ = 1;
    }

  for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i)
    this->received_fragments_[i] = 0;
  CORBA::ULong idx = this->fragment_count_ / bits_per_ulong;
  CORBA::ULong bit = this->fragment_count_ % bits_per_ulong;
  this->received_fragments_[idx] = (0xFFFFFFFF << bit);
}

int
TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order,
                                              CORBA::ULong request_size,
                                              CORBA::ULong fragment_size,
                                              CORBA::ULong fragment_offset,
                                              CORBA::ULong /* fragment_id */,
                                              CORBA::ULong fragment_count) const
{
  if (byte_order != this->byte_order_
      || request_size != this->request_size_
      || fragment_count != this->fragment_count_)
    return 0;

  if (fragment_offset >= request_size
      || fragment_offset + fragment_size > request_size)
    return 0;

  return 1;
}

int
TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const
{
  // Assume out-of-range fragments as received, so they are dropped...
  if (fragment_id > this->fragment_count_)
    return 1;

  const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
  CORBA::ULong idx = fragment_id / bits_per_ulong;
  CORBA::ULong bit = fragment_id % bits_per_ulong;
  return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit);
}

void
TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id)
{
  // Assume out-of-range fragments as received, so they are dropped...
  if (fragment_id > this->fragment_count_)
    return;

  const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
  CORBA::ULong idx = fragment_id / bits_per_ulong;
  CORBA::ULong bit = fragment_id % bits_per_ulong;
  ACE_SET_BITS (this->received_fragments_[idx], 1<<bit);
}

int
TAO_ECG_UDP_Request_Entry::complete (void) const
{
  for (CORBA::ULong i = 0;
       i < this->received_fragments_size_;
       ++i)
    {
      if (this->received_fragments_[i] != 0xFFFFFFFF)
        return 0;
    }
  return 1;
}

char*
TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset)
{
  return this->payload_.rd_ptr () + fragment_offset;
}
// ****************************************************************

int
TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size,
                                              size_t min_purge_count)
{
  // Already initialized.
  if (this->fragmented_requests_)
    return -1;

  ACE_NEW_RETURN (this->fragmented_requests_,
                  TAO_ECG_UDP_Request_Entry*[size],
                  -1);

  this->size_ = size;
  this->id_range_low_ = 0;
  this->id_range_high_ = size - 1;
  this->min_purge_count_ = min_purge_count;

  for (size_t i = 0; i < size; ++i)
    {
      this->fragmented_requests_[i] = 0;
    }

  return 0;
}

TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void)
{
  for (size_t i = 0; i < this->size_; ++i)
    {
      TAO_ECG_UDP_Request_Entry* request =
        this->fragmented_requests_[i];

      if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
        delete request;
    }

  delete [] this->fragmented_requests_;

  this->fragmented_requests_ = 0;
  this->size_ = 0;
  this->id_range_low_ = 0;
  this->id_range_high_ = 0;
}

TAO_ECG_UDP_Request_Entry **
TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id)
{
  if (request_id < this->id_range_low_)
    // <request_id> is below the current range.
    {
      return 0;
    }

  if (request_id > this->id_range_high_)
    // <request_id> is above the current range - need to shift the range
    // to include it.
    {
      CORBA::ULong new_slots_needed = request_id - this->id_range_high_;

      if (new_slots_needed < this->min_purge_count_)
        new_slots_needed = this->min_purge_count_;

      if (new_slots_needed > this->size_)
        // Shifting the range by more than the size of array.
        {
          this->purge_requests (this->id_range_low_, this->id_range_high_);
          this->id_range_high_ = request_id;
          this->id_range_low_ = request_id - this->size_ + 1;
        }
      else
        {
          this->purge_requests (this->id_range_low_,
                                this->id_range_low_ + new_slots_needed - 1);
          this->id_range_high_ += new_slots_needed;
          this->id_range_low_ += new_slots_needed;
        }
    }

  // Return array location for <request_id>.
  int index = request_id % this->size_;
  return this->fragmented_requests_ + index;
}


void
TAO_ECG_CDR_Message_Receiver::Requests::purge_requests (
                                            CORBA::ULong purge_first,
                                            CORBA::ULong purge_last)
{
  for (CORBA::ULong i = purge_first; i <= purge_last; ++i)
    {
      size_t index = i % this->size_;
      if (this->fragmented_requests_[index]
          != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
        {
          delete this->fragmented_requests_[index];
        }
      this->fragmented_requests_[index] = 0;
    }
}

// ****************************************************************

TAO_ECG_UDP_Request_Entry
TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0);

int
TAO_ECG_CDR_Message_Receiver::handle_input (
                                 ACE_SOCK_Dgram& dgram,
                                 TAO_ECG_CDR_Processor *cdr_processor)
{
  char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
                        + ACE_CDR::MAX_ALIGNMENT];
  char *header_buf = ACE_ptr_align_binary (nonaligned_header,
                                           ACE_CDR::MAX_ALIGNMENT);

  char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
  char *data_buf = ACE_ptr_align_binary (nonaligned_data,
                                         ACE_CDR::MAX_ALIGNMENT);

  // Read the message from dgram.

  const int iovcnt = 2;
  iovec iov[iovcnt];
  iov[0].iov_base = header_buf;
  iov[0].iov_len  = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
  iov[1].iov_base = data_buf;
  iov[1].iov_len  = ACE_MAX_DGRAM_SIZE;

  ACE_INET_Addr from;
  ssize_t n = dgram.recv (iov, iovcnt, from);

  if (n == -1)
    {
      if (errno == EWOULDBLOCK)
        return 0;

      ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m)."),
                        -1);
    }

  if (n == 0)
    {
      ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
                                  "read 0 bytes from socket."),
                        0);
    }

  if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
    {
      ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
                                   "# of bytes read < mcast header size."),
                         -1);
    }

  u_int crc = 0;

  if (this->check_crc_)
    {
      iov[1].iov_len = n - iov[0].iov_len;
      iov[0].iov_len -= 4;  // don't include crc

      crc = ACE::crc32 (iov, 2);
    }
  // Check whether the message is a loopback message.
  if (this->ignore_from_.get () != 0
      && this->ignore_from_->is_loopback (from))
    {
      return 0;
    }

  // Decode and validate mcast header.
  Mcast_Header header;
  if (header.read (header_buf, n, this->check_crc_) == -1)
    return -1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -