📄 ecg_cdr_message_receiver.cpp
字号:
// ECG_CDR_Message_Receiver.cpp,v 1.8 2003/11/24 05:21:59 bala Exp
#include "ECG_CDR_Message_Receiver.h"
#include "ECG_CDR_Message_Sender.h"
#include "tao/Exception.h"
#include "ace/SOCK_Dgram.h"
#include "ace/ACE.h"
#include "ace/OS_NS_string.h"
#if !defined(__ACE_INLINE__)
#include "ECG_CDR_Message_Receiver.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID (Event,
ECG_CDR_Message_Receiver,
"ECG_CDR_Message_Receiver.cpp,v 1.8 2003/11/24 05:21:59 bala Exp")
TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void)
{
}
// ****************************************************************
TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void)
{
if (this->own_received_fragments_)
{
this->own_received_fragments_ = 0;
delete[] this->received_fragments_;
}
}
TAO_ECG_UDP_Request_Entry::
TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
CORBA::ULong request_id,
CORBA::ULong request_size,
CORBA::ULong fragment_count)
: byte_order_ (byte_order)
, request_id_ (request_id)
, request_size_ (request_size)
, fragment_count_ (fragment_count)
{
ACE_CDR::grow (&this->payload_, this->request_size_);
this->payload_.wr_ptr (request_size_);
this->received_fragments_ = this->default_received_fragments_;
this->own_received_fragments_ = 0;
const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
this->received_fragments_size_ =
this->fragment_count_ / bits_per_ulong + 1;
if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ)
{
ACE_NEW (this->received_fragments_,
CORBA::ULong[this->received_fragments_size_]);
this->own_received_fragments_ = 1;
}
for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i)
this->received_fragments_[i] = 0;
CORBA::ULong idx = this->fragment_count_ / bits_per_ulong;
CORBA::ULong bit = this->fragment_count_ % bits_per_ulong;
this->received_fragments_[idx] = (0xFFFFFFFF << bit);
}
int
TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order,
CORBA::ULong request_size,
CORBA::ULong fragment_size,
CORBA::ULong fragment_offset,
CORBA::ULong /* fragment_id */,
CORBA::ULong fragment_count) const
{
if (byte_order != this->byte_order_
|| request_size != this->request_size_
|| fragment_count != this->fragment_count_)
return 0;
if (fragment_offset >= request_size
|| fragment_offset + fragment_size > request_size)
return 0;
return 1;
}
int
TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const
{
// Assume out-of-range fragments as received, so they are dropped...
if (fragment_id > this->fragment_count_)
return 1;
const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
CORBA::ULong idx = fragment_id / bits_per_ulong;
CORBA::ULong bit = fragment_id % bits_per_ulong;
return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit);
}
void
TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id)
{
// Assume out-of-range fragments as received, so they are dropped...
if (fragment_id > this->fragment_count_)
return;
const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT;
CORBA::ULong idx = fragment_id / bits_per_ulong;
CORBA::ULong bit = fragment_id % bits_per_ulong;
ACE_SET_BITS (this->received_fragments_[idx], 1<<bit);
}
int
TAO_ECG_UDP_Request_Entry::complete (void) const
{
for (CORBA::ULong i = 0;
i < this->received_fragments_size_;
++i)
{
if (this->received_fragments_[i] != 0xFFFFFFFF)
return 0;
}
return 1;
}
char*
TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset)
{
return this->payload_.rd_ptr () + fragment_offset;
}
// ****************************************************************
int
TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size,
size_t min_purge_count)
{
// Already initialized.
if (this->fragmented_requests_)
return -1;
ACE_NEW_RETURN (this->fragmented_requests_,
TAO_ECG_UDP_Request_Entry*[size],
-1);
this->size_ = size;
this->id_range_low_ = 0;
this->id_range_high_ = size - 1;
this->min_purge_count_ = min_purge_count;
for (size_t i = 0; i < size; ++i)
{
this->fragmented_requests_[i] = 0;
}
return 0;
}
TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void)
{
for (size_t i = 0; i < this->size_; ++i)
{
TAO_ECG_UDP_Request_Entry* request =
this->fragmented_requests_[i];
if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
delete request;
}
delete [] this->fragmented_requests_;
this->fragmented_requests_ = 0;
this->size_ = 0;
this->id_range_low_ = 0;
this->id_range_high_ = 0;
}
TAO_ECG_UDP_Request_Entry **
TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id)
{
if (request_id < this->id_range_low_)
// <request_id> is below the current range.
{
return 0;
}
if (request_id > this->id_range_high_)
// <request_id> is above the current range - need to shift the range
// to include it.
{
CORBA::ULong new_slots_needed = request_id - this->id_range_high_;
if (new_slots_needed < this->min_purge_count_)
new_slots_needed = this->min_purge_count_;
if (new_slots_needed > this->size_)
// Shifting the range by more than the size of array.
{
this->purge_requests (this->id_range_low_, this->id_range_high_);
this->id_range_high_ = request_id;
this->id_range_low_ = request_id - this->size_ + 1;
}
else
{
this->purge_requests (this->id_range_low_,
this->id_range_low_ + new_slots_needed - 1);
this->id_range_high_ += new_slots_needed;
this->id_range_low_ += new_slots_needed;
}
}
// Return array location for <request_id>.
int index = request_id % this->size_;
return this->fragmented_requests_ + index;
}
void
TAO_ECG_CDR_Message_Receiver::Requests::purge_requests (
CORBA::ULong purge_first,
CORBA::ULong purge_last)
{
for (CORBA::ULong i = purge_first; i <= purge_last; ++i)
{
size_t index = i % this->size_;
if (this->fragmented_requests_[index]
!= &TAO_ECG_CDR_Message_Receiver::Request_Completed_)
{
delete this->fragmented_requests_[index];
}
this->fragmented_requests_[index] = 0;
}
}
// ****************************************************************
TAO_ECG_UDP_Request_Entry
TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0);
int
TAO_ECG_CDR_Message_Receiver::handle_input (
ACE_SOCK_Dgram& dgram,
TAO_ECG_CDR_Processor *cdr_processor)
{
char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
+ ACE_CDR::MAX_ALIGNMENT];
char *header_buf = ACE_ptr_align_binary (nonaligned_header,
ACE_CDR::MAX_ALIGNMENT);
char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
char *data_buf = ACE_ptr_align_binary (nonaligned_data,
ACE_CDR::MAX_ALIGNMENT);
// Read the message from dgram.
const int iovcnt = 2;
iovec iov[iovcnt];
iov[0].iov_base = header_buf;
iov[0].iov_len = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
iov[1].iov_base = data_buf;
iov[1].iov_len = ACE_MAX_DGRAM_SIZE;
ACE_INET_Addr from;
ssize_t n = dgram.recv (iov, iovcnt, from);
if (n == -1)
{
if (errno == EWOULDBLOCK)
return 0;
ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m)."),
-1);
}
if (n == 0)
{
ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
"read 0 bytes from socket."),
0);
}
if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE)
{
ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: "
"# of bytes read < mcast header size."),
-1);
}
u_int crc = 0;
if (this->check_crc_)
{
iov[1].iov_len = n - iov[0].iov_len;
iov[0].iov_len -= 4; // don't include crc
crc = ACE::crc32 (iov, 2);
}
// Check whether the message is a loopback message.
if (this->ignore_from_.get () != 0
&& this->ignore_from_->is_loopback (from))
{
return 0;
}
// Decode and validate mcast header.
Mcast_Header header;
if (header.read (header_buf, n, this->check_crc_) == -1)
return -1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -