📄 uipmc_transport.cpp
字号:
// Pretend it is o.k. See note by bytes_to_send calculation.
bytes_transferred = bytes_to_send;
return 1;
}
// Increment the number of bytes transferred, but don't
// count the MIOP header that we added.
bytes_transferred += rc - MIOP_HEADER_SIZE;
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO_UIPMC_Transport::send_i: sent %d bytes to %s:%d\n",
rc,
addr.get_host_addr (),
addr.get_port_number ()));
}
// Go to the next fragment.
(*packet_number)++;
++current_fragment;
--num_fragments;
}
// Return total bytes transferred.
return bytes_transferred;
}
ssize_t
TAO_UIPMC_Transport::recv (char *buf,
size_t len,
const ACE_Time_Value * /*max_wait_time*/)
{
ACE_INET_Addr from_addr;
ssize_t n = this->connection_handler_->mcast_dgram ().recv (buf,
len,
from_addr);
if (TAO_debug_level > 5)
{
ACE_DEBUG ((LM_DEBUG,
"TAO_UIPMC_Transport::recv_i: received %d bytes from %s:%d\n",
n,
from_addr.get_host_addr (),
from_addr.get_port_number ()));
}
// Make sure that we at least have a MIOP header.
if (n < MIOP_MIN_HEADER_SIZE)
{
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO_UIPMC_Transport::recv_i: packet of size %d is too small from %s:%d\n",
n,
from_addr.get_host_addr (),
from_addr.get_port_number ()));
}
return 0;
}
// Check for MIOP magic bytes.
if (buf[MIOP_MAGIC_OFFSET] != miop_magic [0] ||
buf[MIOP_MAGIC_OFFSET + 1] != miop_magic [1] ||
buf[MIOP_MAGIC_OFFSET + 2] != miop_magic [2] ||
buf[MIOP_MAGIC_OFFSET + 3] != miop_magic [3])
{
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO_UIPMC_Transport::recv_i: UIPMC packet didn't contain magic bytes.\n"));
}
return 0;
}
// Retrieve the byte order.
// 0 = Big endian
// 1 = Small endian
CORBA::Octet byte_order = buf[MIOP_FLAGS_OFFSET] & 0x01;
// Ignore the header version, other flags, packet length and number of packets.
// Get the length of the ID.
CORBA::ULong id_length;
#if !defined (ACE_DISABLE_SWAP_ON_READ)
if (byte_order == ACE_CDR_BYTE_ORDER)
{
id_length = *ACE_reinterpret_cast (ACE_CDR::ULong*, &buf[MIOP_ID_LENGTH_OFFSET]);
}
else
{
ACE_CDR::swap_4 (&buf[MIOP_ID_LENGTH_OFFSET],
ACE_reinterpret_cast (char*, &id_length));
}
#else
id_length = *ACE_reinterpret_cast (ACE_CDR::ULong*, &buf[MIOP_ID_LENGTH_OFFSET]);
#endif /* ACE_DISABLE_SWAP_ON_READ */
// Make sure that the length field is legal.
if (id_length > MIOP_MAX_LENGTH_ID ||
ACE_static_cast (ssize_t, MIOP_ID_CONTENT_OFFSET + id_length) > n)
{
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO_UIPMC_Transport::recv_i: Invalid ID length.\n"));
}
return 0;
}
// Trim off the header for now.
ssize_t miop_header_size = (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7;
if (miop_header_size > n)
{
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO_UIPMC_Transport::recv_i: MIOP packet not large enough for padding.\n"));
}
return 0;
}
n -= miop_header_size;
ACE_OS::memmove (buf, buf + miop_header_size, n);
return n;
}
int
TAO_UIPMC_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time,
int /*block*/)
{
// If there are no messages then we can go ahead to read from the
// handle for further reading..
// The buffer on the stack which will be used to hold the input
// messages
char buf [MIOP_MAX_DGRAM_SIZE];
#if defined (ACE_HAS_PURIFY)
(void) ACE_OS::memset (buf,
'\0',
sizeof buf);
#endif /* ACE_HAS_PURIFY */
// Create a data block
ACE_Data_Block db (sizeof (buf),
ACE_Message_Block::MB_DATA,
buf,
this->orb_core_->input_cdr_buffer_allocator (),
this->orb_core_->locking_strategy (),
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_dblock_allocator ());
// Create a message block
ACE_Message_Block message_block (&db,
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_msgblock_allocator ());
// Align the message block
ACE_CDR::mb_align (&message_block);
// Read the message into the message block that we have created on
// the stack.
ssize_t n = this->recv (message_block.rd_ptr (),
message_block.space (),
max_wait_time);
// If there is an error return to the reactor..
if (n <= 0)
{
if (TAO_debug_level)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"),
this->id (),
ACE_TEXT ("handle_input_i ()\n")));
}
if (n == -1)
this->tms_->connection_closed ();
return n;
}
// Set the write pointer in the stack buffer.
message_block.wr_ptr (n);
// Parse the incoming message for validity. The check needs to be
// performed by the messaging objects.
if (this->parse_incoming_messages (message_block) == -1)
{
if (TAO_debug_level)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO: (%P|%t|%N|%l) parse_incoming_messages failed on transport %d after fault %p\n"),
this->id (),
ACE_TEXT ("handle_input_i ()\n")));
}
return -1;
}
// NOTE: We are not performing any queueing nor any checking for
// missing data. We are assuming that ALL the data would be got in a
// single read.
// Make a node of the message block..
TAO_Queued_Data qd (&message_block);
// Extract the data for the node..
this->messaging_object ()->get_message_data (&qd);
// Process the message
return this->process_parsed_messages (&qd, rh);
}
int
TAO_UIPMC_Transport::register_handler (void)
{
// We never register register the handler with the reactor
// as we never need to be informed about any incoming data,
// assuming we only use one-ways.
// If we would register and ICMP Messages would arrive, e.g
// due to a not reachable server, we would get informed - as this
// disturbs the general MIOP assumptions of not being
// interested in any network failures, we ignore ICMP messages.
return 0;
}
int
TAO_UIPMC_Transport::send_request (TAO_Stub *stub,
TAO_ORB_Core *orb_core,
TAO_OutputCDR &stream,
int message_semantics,
ACE_Time_Value *max_wait_time)
{
if (this->ws_->sending_request (orb_core,
message_semantics) == -1)
return -1;
if (this->send_message (stream,
stub,
message_semantics,
max_wait_time) == -1)
return -1;
return 0;
}
int
TAO_UIPMC_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
int message_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
if (this->messaging_object_->format_message (stream) != 0)
return -1;
// Strictly speaking, should not need to loop here because the
// socket never gets set to a nonblocking mode ... some Linux
// versions seem to need it though. Leaving it costs little.
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
message_semantics,
stream.begin (),
max_wait_time);
if (n == -1)
{
if (TAO_debug_level)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
this->id (),
ACE_TEXT ("send_message ()\n")));
return -1;
}
return 1;
}
int
TAO_UIPMC_Transport::messaging_init (CORBA::Octet major,
CORBA::Octet minor)
{
this->messaging_object_->init (major,
minor);
return 1;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -