📄 ecg_cdr_message_sender.cpp
字号:
// ECG_CDR_Message_Sender.cpp,v 1.3 2003/06/24 03:32:17 bala Exp
#include "ECG_CDR_Message_Sender.h"
#include "tao/CDR.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/ACE.h"
#if !defined(__ACE_INLINE__)
#include "ECG_CDR_Message_Sender.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Event, ECG_CDR_Message_Sender, "ECG_CDR_Message_Sender.cpp,v 1.3 2003/06/24 03:32:17 bala Exp")
void
TAO_ECG_CDR_Message_Sender::init (
TAO_ECG_Refcounted_Endpoint endpoint_rptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
if (endpoint_rptr.get () == 0
|| endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
{
ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
"nil or unitialized endpoint argument."));
ACE_THROW (CORBA::INTERNAL ());
}
this->endpoint_rptr_ = endpoint_rptr;
}
void
TAO_ECG_CDR_Message_Sender::send_message (const TAO_OutputCDR &cdr,
const ACE_INET_Addr &addr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
if (this->endpoint_rptr_.get () == 0)
{
ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
"on non-initialized sender object."));
ACE_THROW (CORBA::INTERNAL ());
}
CORBA::ULong max_fragment_payload = this->mtu () -
TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
// ACE_ASSERT (max_fragment_payload != 0);
#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
#else
const int TAO_WRITEV_MAX = ACE_IOV_MAX;
#endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
iovec iov[TAO_WRITEV_MAX];
CORBA::ULong total_length;
CORBA::ULong fragment_count =
this->compute_fragment_count (cdr.begin (),
cdr.end (),
TAO_WRITEV_MAX,
max_fragment_payload,
total_length);
CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
// Reserve the first iovec for the header...
int iovcnt = 1;
CORBA::ULong fragment_id = 0;
CORBA::ULong fragment_offset = 0;
CORBA::ULong fragment_size = 0;
for (const ACE_Message_Block* b = cdr.begin ();
b != cdr.end ();
b = b->cont ())
{
CORBA::ULong l = b->length ();
char* rd_ptr = b->rd_ptr ();
iov[iovcnt].iov_base = rd_ptr;
iov[iovcnt].iov_len = l;
fragment_size += l;
iovcnt++;
while (fragment_size > max_fragment_payload)
{
// This fragment is full, we have to send it...
// First adjust the last iov entry:
CORBA::ULong last_mb_length =
max_fragment_payload - (fragment_size - l);
iov[iovcnt - 1].iov_len = last_mb_length;
this->send_fragment (addr,
request_id,
total_length,
max_fragment_payload,
fragment_offset,
fragment_id,
fragment_count,
iov,
iovcnt
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
fragment_id++;
fragment_offset += max_fragment_payload;
// Reset, but don't forget that the last Message_Block
// may need to be sent in multiple fragments..
l -= last_mb_length;
rd_ptr += last_mb_length;
iov[1].iov_base = rd_ptr;
iov[1].iov_len = l;
fragment_size = l;
iovcnt = 2;
}
if (fragment_size == max_fragment_payload)
{
// We filled a fragment, but this time it was filled
// exactly, the treatment is a little different from the
// loop above...
this->send_fragment (addr,
request_id,
total_length,
max_fragment_payload,
fragment_offset,
fragment_id,
fragment_count,
iov,
iovcnt
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
fragment_id++;
fragment_offset += max_fragment_payload;
iovcnt = 1;
fragment_size = 0;
}
if (iovcnt == TAO_WRITEV_MAX)
{
// Now we ran out of space in the iovec, we must send a
// fragment to work around that....
this->send_fragment (addr,
request_id,
total_length,
fragment_size,
fragment_offset,
fragment_id,
fragment_count,
iov,
iovcnt
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
fragment_id++;
fragment_offset += fragment_size;
iovcnt = 1;
fragment_size = 0;
}
}
// There is something left in the iovvec that we must send
// also...
if (iovcnt != 1)
{
// Now we ran out of space in the iovec, we must send a
// fragment to work around that....
this->send_fragment (addr,
request_id,
total_length,
fragment_size,
fragment_offset,
fragment_id,
fragment_count,
iov,
iovcnt
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
fragment_id++;
fragment_offset += fragment_size;
// reset, not needed here...
// iovcnt = 1;
// fragment_size = 0;
}
// ACE_ASSERT (total_length == fragment_offset);
// ACE_ASSERT (fragment_id == fragment_count);
}
void
TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
CORBA::ULong request_id,
CORBA::ULong request_size,
CORBA::ULong fragment_size,
CORBA::ULong fragment_offset,
CORBA::ULong fragment_id,
CORBA::ULong fragment_count,
iovec iov[],
int iovcnt
ACE_ENV_ARG_DECL)
{
CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
/ sizeof(CORBA::ULong)
+ ACE_CDR::MAX_ALIGNMENT];
char* buf = ACE_reinterpret_cast(char*,header);
TAO_OutputCDR cdr (buf, sizeof(header));
cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
// Insert some known values in the padding bytes, so we can smoke
// test the message on the receiving end.
cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
cdr.write_ulong (request_id);
cdr.write_ulong (request_size);
cdr.write_ulong (fragment_size);
cdr.write_ulong (fragment_offset);
cdr.write_ulong (fragment_id);
cdr.write_ulong (fragment_count);
CORBA::Octet padding[4];
// MRH
if (checksum_)
{
// Compute CRC
iov[0].iov_base = cdr.begin ()->rd_ptr ();
iov[0].iov_len = cdr.begin ()->length ();
unsigned int crc = 0;
unsigned char *crc_parts = (unsigned char *)(&crc);
if (iovcnt > 1)
{
crc = ACE::crc32 (iov, iovcnt);
crc = htonl (crc);
}
for (int cnt=0; cnt<4; ++cnt)
{
padding[cnt] = crc_parts[cnt];
}
}
else
{
for (int cnt=0; cnt<4; ++cnt)
{
padding[cnt] = 0;
}
}
//End MRH
cdr.write_octet_array (padding, 4);
iov[0].iov_base = cdr.begin ()->rd_ptr ();
iov[0].iov_len = cdr.begin ()->length ();
ssize_t n = this->dgram ().send (iov,
iovcnt,
addr);
size_t expected_n = 0;
for (int i = 0; i < iovcnt; ++i)
expected_n += iov[i].iov_len;
if (n > 0 && size_t(n) != expected_n)
{
ACE_DEBUG ((LM_ERROR, ("Sent only %d out of %d bytes "
"for mcast fragment."),
n,
expected_n));
}
if (n == -1)
{
if (errno == EWOULDBLOCK)
{
ACE_DEBUG ((LM_ERROR, "Send of mcast fragment failed (%m)."));
// @@ TODO Use a Event Channel specific exception
ACE_THROW (CORBA::COMM_FAILURE ());
}
else
{
ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m)."));
}
}
else if (n == 0)
{
ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m)."));
}
}
CORBA::ULong
TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
const ACE_Message_Block* end,
int iov_size,
CORBA::ULong max_fragment_payload,
CORBA::ULong& total_length)
{
CORBA::ULong fragment_count = 0;
total_length = 0;
CORBA::ULong fragment_size = 0;
// Reserve the first iovec for the header...
int iovcnt = 1;
for (const ACE_Message_Block* b = begin;
b != end;
b = b->cont ())
{
CORBA::ULong l = b->length ();
total_length += l;
fragment_size += l;
iovcnt++;
while (fragment_size > max_fragment_payload)
{
// Ran out of space, must create a fragment...
fragment_count++;
// The next iovector will contain what remains of this
// buffer, but also consider
iovcnt = 2;
l -= max_fragment_payload - (fragment_size - l);
fragment_size = l;
}
if (fragment_size == max_fragment_payload)
{
fragment_count++;
iovcnt = 1;
fragment_size = 0;
}
if (iovcnt >= iov_size)
{
// Ran out of space in the iovector....
fragment_count++;
iovcnt = 1;
fragment_size = 0;
}
}
if (iovcnt != 1)
{
// Send the remaining data in another fragment
fragment_count++;
}
return fragment_count;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -