📄 uipmc_transport.cpp
字号:
// This may look like C, but it's really -*- C++ -*-
// UIPMC_Transport.cpp,v 1.23 2004/01/09 11:59:08 jwillemsen Exp
#include "UIPMC_Transport.h"
#include "UIPMC_Connection_Handler.h"
#include "UIPMC_Message_Block_Data_Iterator.h"
#include "UIPMC_Acceptor.h"
#include "UIPMC_Profile.h"
#include "UIPMC_Wait_Never.h"
#include "tao/Acceptor_Registry.h"
#include "tao/operation_details.h"
#include "tao/Timeprobe.h"
#include "tao/CDR.h"
#include "tao/Transport_Mux_Strategy.h"
#include "tao/Wait_Strategy.h"
#include "tao/Sync_Strategies.h"
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/Resume_Handle.h"
#include "tao/GIOP_Message_Base.h"
#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "UIPMC_Transport.i"
#endif /* ! __ACE_INLINE__ */
ACE_RCSID (PortableGroup,
UIPMC_Transport,
"UIPMC_Transport.cpp,v 1.23 2004/01/09 11:59:08 jwillemsen Exp")
// Local MIOP Definitions:
// Note: We currently support packet fragmentation on transmit, but
// do not support reassembly.
// Limit the number of fragments that we can divide a message
// into.
#define MIOP_MAX_FRAGMENTS (1)
#define MIOP_MAX_HEADER_SIZE (272) // See MIOP Spec. Must be a multiple of 8.
#define MIOP_MAX_DGRAM_SIZE (ACE_MAX_DGRAM_SIZE)
#define MIOP_MAGIC_OFFSET (0)
#define MIOP_VERSION_OFFSET (4)
#define MIOP_FLAGS_OFFSET (5)
#define MIOP_PACKET_LENGTH_OFFSET (6)
#define MIOP_PACKET_NUMBER_OFFSET (8)
#define MIOP_NUMBER_OF_PACKETS_OFFSET (12)
#define MIOP_ID_LENGTH_OFFSET (16)
#define MIOP_MIN_LENGTH_ID (0)
#define MIOP_MAX_LENGTH_ID (252)
#define MIOP_ID_DEFAULT_LENGTH (12)
#define MIOP_ID_CONTENT_OFFSET (20)
#define MIOP_HEADER_PADDING (0) // The ID field needs to be padded to
// a multiple of 8 bytes.
#define MIOP_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
+ MIOP_ID_DEFAULT_LENGTH \
+ MIOP_HEADER_PADDING)
#define MIOP_MIN_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \
+ MIOP_MIN_LENGTH_ID \
+ (8 - MIOP_MIN_LENGTH_ID) /* padding */)
static const CORBA::Octet miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 }; // 'M', 'I', 'O', 'P'
struct MIOP_Packet
{
iovec iov[ACE_IOV_MAX];
int iovcnt;
int length;
};
TAO_UIPMC_Transport::TAO_UIPMC_Transport (TAO_UIPMC_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
CORBA::Boolean /*flag*/)
: TAO_Transport (TAO_TAG_UIPMC_PROFILE,
orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Base (orb_core,
MIOP_MAX_DGRAM_SIZE));
// Replace the default wait strategy with our own
// since we don't support waiting on anything.
delete this->ws_;
ACE_NEW (this->ws_,
TAO_UIPMC_Wait_Never (this));
}
TAO_UIPMC_Transport::~TAO_UIPMC_Transport (void)
{
delete this->messaging_object_;
}
ACE_Event_Handler *
TAO_UIPMC_Transport::event_handler_i (void)
{
return this->connection_handler_;
}
TAO_Connection_Handler *
TAO_UIPMC_Transport::connection_handler_i (void)
{
return this->connection_handler_;
}
TAO_Pluggable_Messaging *
TAO_UIPMC_Transport::messaging_object (void)
{
return this->messaging_object_;
}
void
TAO_UIPMC_Transport::write_unique_id (TAO_OutputCDR &miop_hdr, unsigned long unique)
{
// We currently construct a unique ID for each MIOP message by
// concatenating the address of the buffer to a counter. We may
// also need to use a MAC address or something more unique to
// fully comply with the MIOP specification.
static unsigned long counter = 1; // Don't worry about race conditions on counter,
// since buffer addresses can't be the same if
// this is being called simultaneously.
CORBA::Octet unique_id[MIOP_ID_DEFAULT_LENGTH];
unique_id[0] = ACE_static_cast (CORBA::Octet, unique & 0xff);
unique_id[1] = ACE_static_cast (CORBA::Octet, (unique & 0xff00) >> 8);
unique_id[2] = ACE_static_cast (CORBA::Octet, (unique & 0xff0000) >> 16);
unique_id[3] = ACE_static_cast (CORBA::Octet, (unique & 0xff000000) >> 24);
unique_id[4] = ACE_static_cast (CORBA::Octet, counter & 0xff);
unique_id[5] = ACE_static_cast (CORBA::Octet, (counter & 0xff00) >> 8);
unique_id[6] = ACE_static_cast (CORBA::Octet, (counter & 0xff0000) >> 16);
unique_id[7] = ACE_static_cast (CORBA::Octet, (counter & 0xff000000) >> 24);
unique_id[8] = 0;
unique_id[9] = 0;
unique_id[10] = 0;
unique_id[11] = 0;
miop_hdr.write_ulong (MIOP_ID_DEFAULT_LENGTH);
miop_hdr.write_octet_array (unique_id, MIOP_ID_DEFAULT_LENGTH);
}
ssize_t
TAO_UIPMC_Transport::send (iovec *iov, int iovcnt,
size_t &bytes_transferred,
const ACE_Time_Value *)
{
const ACE_INET_Addr &addr = this->connection_handler_->addr ();
bytes_transferred = 0;
// Calculate the bytes to send. This value is only used for
// error conditions to fake a good return. We do this for
// semantic consistency with DIOP, and since errors aren't
// handled correctly from send_i (our fault). If these
// semantics are not desirable, the error handling problems
// that need to be fixed can be found in
// UIPMC_Connection_Handler::decr_refcount which will need to
// deregister the connection handler from the UIPMC_Connector
// cache.
ssize_t bytes_to_send = 0;
for (int i = 0; i < iovcnt; i++)
bytes_to_send += iov[i].iov_len;
MIOP_Packet fragments[MIOP_MAX_FRAGMENTS];
MIOP_Packet *current_fragment;
int num_fragments = 1;
UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt);
// Initialize the first fragment
current_fragment = &fragments[0];
current_fragment->iovcnt = 1; // The MIOP Header
current_fragment->length = MIOP_HEADER_SIZE;
// Go through all of the message blocks.
while (mb_iter.next_block (MIOP_MAX_DGRAM_SIZE - current_fragment->length,
current_fragment->iov[current_fragment->iovcnt]))
{
// Increment the length and iovcnt.
current_fragment->length += current_fragment->iov[current_fragment->iovcnt].iov_len;
current_fragment->iovcnt++;
// Check if we've filled up this fragment or if we've run out of
// iov entries.
if (current_fragment->length == MIOP_MAX_DGRAM_SIZE ||
current_fragment->iovcnt == ACE_IOV_MAX)
{
// Make a new fragment.
num_fragments++;
// Check if too many fragments
if (num_fragments > MIOP_MAX_FRAGMENTS)
{
// This is an error as we do not send more.
// Silently drop the message but log an error.
// Pluggable_Messaging::transport_message only
// cares if it gets -1 or 0 so we can return a
// partial length and it will think all has gone
// well.
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("\n\nTAO (%P|%t) ")
ACE_TEXT ("UIPMC_Transport::send_i ")
ACE_TEXT ("Message of size %d needs too many MIOP fragments (max is %d).\n")
ACE_TEXT ("You may be able to increase ACE_MAX_DGRAM_SIZE.\n"),
bytes_to_send,
MIOP_MAX_FRAGMENTS));
}
// Pretend it is o.k. See note by bytes_to_send calculation.
bytes_transferred = bytes_to_send;
return 1;
}
// Otherwise, initialize another fragment.
current_fragment++;
current_fragment->iovcnt = 1; // The MIOP Header
current_fragment->length = MIOP_HEADER_SIZE;
}
}
// Build a generic MIOP Header.
// Allocate space on the stack for the header (add 8 to account for
// the possibility of adjusting for alignment).
char header_buffer[MIOP_HEADER_SIZE + 8];
TAO_OutputCDR miop_hdr (header_buffer, MIOP_HEADER_SIZE + 8);
miop_hdr.write_octet_array (miop_magic, 4); // Magic
miop_hdr.write_octet (0x10); // Version
CORBA::Octet *flags_field = ACE_reinterpret_cast (CORBA::Octet *,
miop_hdr.current ()->wr_ptr ());
// Write flags octet:
// Bit Description
// 0 Endian
// 1 Stop message flag (Assigned later)
// 2 - 7 Set to 0
miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER); // Flags
// Packet Length
// NOTE: We can save pointers and write them later without byte swapping since
// in CORBA, the sender chooses the endian.
CORBA::UShort *packet_length = ACE_reinterpret_cast (CORBA::UShort *,
miop_hdr.current ()->wr_ptr ());
miop_hdr.write_short (0);
// Packet number
CORBA::ULong *packet_number = ACE_reinterpret_cast (CORBA::ULong *,
miop_hdr.current ()->wr_ptr ());
miop_hdr.write_ulong (0);
// Number of packets field
miop_hdr.write_ulong (num_fragments);
// UniqueId
ptrdiff_t unique_id = ACE_reinterpret_cast (ptrdiff_t, iov);
this->write_unique_id (miop_hdr,
ACE_static_cast (unsigned long, unique_id));
// Send the buffers.
current_fragment = &fragments[0];
while (num_fragments > 0 &&
current_fragment->iovcnt > 1)
{
// Fill in the packet length header field.
*packet_length = current_fragment->length;
// If this is the last fragment, set the stop message flag.
if (num_fragments == 1)
{
*flags_field |= 0x02;
}
// Setup the MIOP header in the iov list.
current_fragment->iov[0].iov_base = miop_hdr.current ()->rd_ptr ();
current_fragment->iov[0].iov_len = MIOP_HEADER_SIZE;
// Send the fragment. - Need to check for errors!!
ssize_t rc = this->connection_handler_->dgram ().send (current_fragment->iov,
current_fragment->iovcnt,
addr);
if (rc <= 0)
{
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("\n\nTAO (%P|%t) ")
ACE_TEXT ("UIPMC_Transport::send")
ACE_TEXT (" %p\n\n"),
ACE_TEXT ("Error returned from transport:")));
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -