shmiop_transport.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 262 行
CPP
262 行
// This may look like C, but it's really -*- C++ -*-
// SHMIOP_Transport.cpp,v 1.35 2003/12/14 16:03:49 bala Exp
#include "SHMIOP_Transport.h"
#if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0)
#include "SHMIOP_Connection_Handler.h"
#include "SHMIOP_Profile.h"
#include "tao/Timeprobe.h"
#include "tao/CDR.h"
#include "tao/Transport_Mux_Strategy.h"
#include "tao/Wait_Strategy.h"
#include "tao/Sync_Strategies.h"
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/Resume_Handle.h"
#include "tao/GIOP_Message_Base.h"
#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "SHMIOP_Transport.i"
#endif /* ! __ACE_INLINE__ */
ACE_RCSID (Strategies, SHMIOP_Transport, "SHMIOP_Transport.cpp,v 1.35 2003/12/14 16:03:49 bala Exp")
TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
CORBA::Boolean flag)
: TAO_Transport (TAO_TAG_SHMEM_PROFILE,
orb_core),
connection_handler_ (handler),
messaging_object_ (0)
{
if (flag)
{
// Use the lite version of the protocol
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Lite (orb_core));
}
else
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Base (orb_core));
}
}
TAO_SHMIOP_Transport::~TAO_SHMIOP_Transport (void)
{
delete this->messaging_object_;
}
ACE_Event_Handler *
TAO_SHMIOP_Transport::event_handler_i (void)
{
return this->connection_handler_;
}
TAO_Connection_Handler *
TAO_SHMIOP_Transport::connection_handler_i (void)
{
return this->connection_handler_;
}
TAO_Pluggable_Messaging *
TAO_SHMIOP_Transport::messaging_object (void)
{
return this->messaging_object_;
}
ssize_t
TAO_SHMIOP_Transport::send (iovec *iov, int iovcnt,
size_t &bytes_transferred,
const ACE_Time_Value *max_wait_time)
{
bytes_transferred = 0;
for (int i = 0; i < iovcnt; ++i)
{
ssize_t retval =
this->connection_handler_->peer ().send (iov[i].iov_base,
iov[i].iov_len,
max_wait_time);
if (retval > 0)
bytes_transferred += retval;
if (retval <= 0)
return retval;
}
return bytes_transferred;
}
ssize_t
TAO_SHMIOP_Transport::recv (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
ssize_t n = 0;
int read_break = 0;
while (!read_break)
{
n = this->connection_handler_->peer ().recv (buf,
len,
max_wait_time);
// If we get a EWOULBLOCK we try to read again.
if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
{
n = 0;
continue;
}
// If there is anything else we just drop out of the loop.
read_break = 1;
}
if (n == -1)
{
if (TAO_debug_level > 3 && errno != ETIME)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - %p \n"),
ACE_TEXT ("TAO - read message failure ")
ACE_TEXT ("recv_i () \n")));
}
}
else if (n == 0)
{
n = -1;
}
return n;
}
int
TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
ssize_t missing_data,
TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time)
{
// Calculate the actual length of the load that we are supposed to
// read which is equal to the <missing_data> + length of the buffer
// that we have..
size_t payload = missing_data + incoming.length ();
// Grow the buffer to the size of the message
ACE_CDR::grow (&incoming,
payload);
// .. do a read on the socket again.
ssize_t bytes = 0;
// As this used for transports where things are available in one
// shot this looping should not create any problems.
for (size_t n = missing_data;
n != 0;
n -= bytes)
{
// We would have liked to use something like a recv_n ()
// here. But at the time when the code was written, the MEM_Stream
// classes had poor support for recv_n (). Till a day when we
// get proper recv_n (), let us stick with this. The other
// argument that can be said against this is that, this is the
// bad layer in which this is being done ie. recv_n is
// simulated. But...
bytes = this->recv (incoming.wr_ptr (),
n,
max_wait_time);
if (bytes == 0 ||
bytes == -1)
{
return -1;
}
incoming.wr_ptr (bytes);
}
TAO_Queued_Data pqd (&incoming);
// With SHMIOP we would not have any missing data...
pqd.missing_data_ = 0;
this->messaging_object ()->get_message_data (&pqd);
// Now we have a full message in our buffer. Just go ahead and
// process that
return this->process_parsed_messages (&pqd, rh);
}
int
TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
TAO_ORB_Core *orb_core,
TAO_OutputCDR &stream,
int message_semantics,
ACE_Time_Value *max_wait_time)
{
if (this->ws_->sending_request (orb_core,
message_semantics) == -1)
return -1;
if (this->send_message (stream,
stub,
message_semantics,
max_wait_time) == -1)
return -1;
this->first_request_sent();
return 0;
}
int
TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
int message_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
if (this->messaging_object_->format_message (stream) != 0)
return -1;
// Strictly speaking, should not need to loop here because the
// socket never gets set to a nonblocking mode ... some Linux
// versions seem to need it though. Leaving it costs little.
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
message_semantics,
stream.begin (),
max_wait_time);
if (n == -1)
{
if (TAO_debug_level)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
this->id (),
ACE_TEXT ("send_message ()\n")));
return -1;
}
return 1;
}
int
TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major,
CORBA::Octet minor)
{
this->messaging_object_->init (major,
minor);
return 1;
}
#endif /* TAO_HAS_SHMIOP && TAO_HAS_SHMIOP != 0 */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?