📄 qos_udp.cpp
字号:
{
ACE_QoS_Manager qos_manager =
this->get_socket ()->qos_manager ();
ACE_QoS ace_qos = this->qos_session_->qos ();
this->qos_session_->qos (this->get_socket (),
&qos_manager,
ace_qos);
}
}
return 0;
}
int
TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos)
{
if( TAO_debug_level > 0 )
{
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::change_qos\n"));
}
ACE_QoS* ace_qos = 0;
ACE_NEW_RETURN (ace_qos,
ACE_QoS,
-1);
if (new_qos.QoSParams.length () != 0)
{
ACE_DEBUG ((LM_DEBUG,
"New QoS Params are not Empty\n"));
ACE_Flow_Spec *ace_flow_spec;
ACE_NEW_RETURN (ace_flow_spec,
ACE_Flow_Spec,
-1);
this->translate (new_qos.QoSParams,
ace_flow_spec);
Fill_ACE_QoS fill_ace_qos;
if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_SENDER)
{
if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
ace_flow_spec) !=0)
ACE_ERROR_RETURN ((LM_ERROR,
"Unable to fill simplex sender qos\n"),
-1);
else
{
if( TAO_debug_level > 0 )
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Filled up the Sender QoS parameters\n"));
}
}
else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER)
{
if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
ace_flow_spec) !=0)
ACE_ERROR_RETURN ((LM_ERROR,
"Unable to fill simplex receiver qos\n"),
-1);
else
{
if( TAO_debug_level > 0 )
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Filled up the Receiver QoS parameters\n"));
}
}
ACE_QoS_Params qos_params;
FillQoSParams (qos_params,
0,
ace_qos);
}
ACE_QoS_Manager qos_manager =
this->get_socket ()->qos_manager ();
int result = this->qos_session_->qos (this->get_socket (),
&qos_manager,
*ace_qos);
if (result != 0)
return result;
return 0;
}
int
TAO_AV_UDP_QoS_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
const void *arg)
{
return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
}
int
TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
{
if (TAO_debug_level > 0)
{
char buf [BUFSIZ];
ACE_INET_Addr *remote_addr = ACE_dynamic_cast (ACE_INET_Addr*, address);
remote_addr->addr_to_string (buf,
BUFSIZ);
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::set_remote_address %s\n",
buf));
}
ACE_INET_Addr *inet_addr =
ACE_dynamic_cast (ACE_INET_Addr*,address);
this->peer_addr_ = *inet_addr;
TAO_AV_UDP_QoS_Transport *transport =
ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_);
if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
{
TAO_AV_UDP_QoS_Session_Helper helper;
this->qos_session_ = helper.open_qos_session (this,
*inet_addr);
if (this->qos_session_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"QoS Session Open Failed (%N|%l)\n"),
-1);
ACE_INET_Addr local_addr;
this->get_socket ()->get_local_addr (local_addr);
ACE_INET_Addr* src_addr;
ACE_NEW_RETURN (src_addr,
ACE_INET_Addr (local_addr.get_port_number (),
local_addr.get_host_name ()),
-1);
this->qos_session_->source_addr (src_addr);
if (helper.activate_qos_handler (this->qos_session_,
this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Activating QoS Handler Failed (%N|%l)\n"),
-1);
}
return transport->set_remote_address (*inet_addr);
}
ACE_HANDLE
TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
this->qos_sock_dgram_.get_handle ()));
return this->qos_sock_dgram_.get_handle () ;
}
//------------------------------------------------------------
// TAO_AV_UDP_Transport
//------------------------------------------------------------
TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (void)
:handler_ (0)
{
}
TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler)
:handler_ (handler),
addr_ (0)
{
}
TAO_AV_UDP_QoS_Transport::~TAO_AV_UDP_QoS_Transport (void)
{
}
int
TAO_AV_UDP_QoS_Transport::set_remote_address (const ACE_INET_Addr &address)
{
this->peer_addr_ = address;
return 0;
}
int
TAO_AV_UDP_QoS_Transport::open (ACE_Addr * /*address*/)
{
return 0;
}
int
TAO_AV_UDP_QoS_Transport::close (void)
{
return 0;
}
int
TAO_AV_UDP_QoS_Transport::mtu (void)
{
return ACE_MAX_DGRAM_SIZE;
}
ACE_Addr*
TAO_AV_UDP_QoS_Transport::get_peer_addr (void)
{
return &this->peer_addr_;
}
ssize_t
TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
ACE_Time_Value *)
{
// For the most part this was copied from GIOP::send_request and
// friends.
iovec iov[IOV_MAX];
int iovcnt = 0;
ssize_t n = 0;
ssize_t nbytes = 0;
for (const ACE_Message_Block *i = mblk;
i != 0;
i = i->cont ())
{
// Make sure there is something to send!
if (i->length () > 0)
{
iov[iovcnt].iov_base = i->rd_ptr ();
iov[iovcnt].iov_len = ACE_static_cast (u_long, i->length ());
iovcnt++;
// The buffer is full make a OS call. @@ TODO this should
// be optimized on a per-platform basis, for instance, some
// platforms do not implement writev() there we should copy
// the data into a buffer and call send_n(). In other cases
// there may be some limits on the size of the iovec, there
// we should set IOV_MAX to that limit.
size_t bytes_sent = 0;
if (iovcnt == IOV_MAX)
{
if (this->handler_->get_socket ()->send (iov,
1,
bytes_sent,
0,
this->handler_->qos_session ()->dest_addr (),
0,
0) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in dgram_mcast.send () (%N|%l)\n"),
-1);
else
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Using ACE_OS::sendto () : Bytes sent : %d",
bytes_sent));
if (n < 1)
return n;
nbytes += bytes_sent;
iovcnt = 0;
}
}
}
size_t bytes_sent = 0;
// Check for remaining buffers to be sent!
if (iovcnt != 0)
{
if (this->handler_->get_socket ()->send (iov,
1,
bytes_sent,
0,
this->handler_->qos_session ()->dest_addr (),
0,
0) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in dgram_mcast.send ()\n"),
-1);
else
if( TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
bytes_sent));
if (n < 1)
return n;
nbytes += bytes_sent;
}
return nbytes;
}
ssize_t
TAO_AV_UDP_QoS_Transport::send (const char *buf,
size_t len,
ACE_Time_Value *)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_UDP_QoS_Transport::send "));
char addr [BUFSIZ];
this->peer_addr_.addr_to_string (addr,BUFSIZ);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) to %s\n",
addr));
return this->handler_->get_socket ()->send (buf,
len,
this->handler_->qos_session ()->dest_addr (),
0,
0,
0);
}
ssize_t
TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
int /*iovcnt*/,
ACE_Time_Value *)
{
size_t bytes_sent = 0;
if (this->handler_->get_socket ()->send (iov,
1,
bytes_sent,
0,
this->handler_->qos_session ()->dest_addr (),
0,
0) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in dgram_mcast.send ()\n"),
-1);
else
{
if( TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
bytes_sent));
}
return bytes_sent;
}
ssize_t
TAO_AV_UDP_QoS_Transport::recv (char *buf,
size_t len,
ACE_Time_Value *)
{
return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
}
ssize_t
TAO_AV_UDP_QoS_Transport::recv (char *buf,
size_t len,
int flags,
ACE_Time_Value *timeout)
{
return this->handler_->get_socket ()->recv (buf,
len,
this->peer_addr_,
flags,
timeout);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -