📄 sfp.cpp
字号:
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
msg_type = flowProtocol::Fragment_Msg;
}
else if (ACE_OS::strcmp (magic_number,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
msg_type = flowProtocol::Credit_Msg;
}
else
ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),-1);
return 0;
}
int
TAO_SFP_Base::read_start_message (TAO_AV_Transport *transport,
flowProtocol::Start &start,
TAO_InputCDR &input)
{
input.grow (start_len);
char *buf = input.rd_ptr ();
int n = transport->recv (buf,
start_len);
if (n != ACE_static_cast (int, start_len))
ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start\n"),0);
else
{
if (!(input >> start))
return -1;
}
return 0;
}
int
TAO_SFP_Base::read_start_reply_message (TAO_AV_Transport *transport,
flowProtocol::StartReply &start_reply,
TAO_InputCDR &input)
{
input.grow (start_len);
char *buf = input.rd_ptr ();
int n = transport->recv (buf,
start_reply_len);
if (n != ACE_static_cast (int, start_len))
ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start_reply_message"),0);
else
{
if (!(input >> start_reply))
return -1;
}
return 0;
}
int
TAO_SFP_Base::read_credit_message (TAO_AV_Transport *transport,
flowProtocol::credit &credit,
TAO_InputCDR &input)
{
input.grow (start_len);
char *buf = input.rd_ptr ();
int n = transport->recv (buf,
credit_len);
if (n != ACE_static_cast (int, credit_len))
ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_credit_message"),0);
else
{
if (!(input >> credit))
return -1;
}
return 0;
}
int
TAO_SFP_Base::read_endofstream_message (TAO_AV_Transport *transport,
flowProtocol::frameHeader &endofstream,
TAO_InputCDR &input)
{
input.grow (start_len);
char *buf = input.rd_ptr ();
int n = transport->recv (buf,
frame_header_len);
if (n != ACE_static_cast (int, frame_header_len))
ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
else
{
if (!(input >> endofstream))
return -1;
}
return 0;
}
int
TAO_SFP_Base::peek_frame_header (TAO_AV_Transport *transport,
flowProtocol::frameHeader &header,
TAO_InputCDR &input)
{
input.grow (frame_header_len);
char *buf = input.rd_ptr ();
int n = transport->recv (buf,
frame_header_len,
MSG_PEEK);
if (n != ACE_static_cast (int, frame_header_len))
ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
else
{
if (!(input >> header))
return -1;
}
return 0;
}
int
TAO_SFP_Base::peek_fragment_header (TAO_AV_Transport *transport,
flowProtocol::fragment &fragment,
TAO_InputCDR &input)
{
input.grow (fragment_len);
char *buf = input.rd_ptr ();
int n = transport->recv (buf,
fragment_len,
MSG_PEEK);
if (n != ACE_static_cast (int, fragment_len))
ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
else
{
if (!(input >> fragment))
return -1;
}
return 0;
}
void
TAO_SFP_Base::dump_buf (char *buffer,int size)
{
char *buf = buffer;
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
for (int i=0;i<size;i++)
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"%d ",buf[i]));
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
}
//------------------------------------------------------------
// TAO_SFP_Object
//------------------------------------------------------------
TAO_SFP_Object::TAO_SFP_Object (TAO_AV_Callback *callback,
TAO_AV_Transport *transport)
:TAO_AV_Protocol_Object (callback,transport),
source_id_ (10),
max_credit_ (-1),
current_credit_ (-1)
{
TAO_SFP_BASE::instance ();
this->state_.static_frame_.size (2* this->transport_->mtu ());
}
TAO_SFP_Object::~TAO_SFP_Object (void)
{
//no-op
}
int
TAO_SFP_Object::destroy (void)
{
int result = -1;
TAO_OutputCDR out_stream;
result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER,
flowProtocol::EndofStream_Msg,
out_stream);
if (result < 0)
return result;
result = TAO_SFP_Base::send_message (this->transport_,
out_stream);
if (result < 0)
return result;
this->callback_->handle_destroy ();
return 0;
}
int
TAO_SFP_Object::send_frame (ACE_Message_Block *frame,
TAO_AV_frame_info *frame_info)
{
TAO_OutputCDR out_stream;
CORBA::Boolean result = 0;
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n"));
CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
if (this->transport_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
if (this->current_credit_ != 0)
{
// if we have enough credit then we send.
size_t total_length = 0;
for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
total_length += temp->length ();
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len))
{
if (frame_info != 0)
{
if (frame_info->boundary_marker)
flags |= 4;
CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
flowProtocol::Frame_Msg,
out_stream);
if (result == 0)
return 0;
flowProtocol::my_seq_ulong source_ids;
source_ids.length (1);
source_ids [0] = 0;
TAO_SFP_Base::write_frame_message (frame_info->timestamp,
frame_info->ssrc,
source_ids,
this->sequence_num_,
out_stream);
}
else
{
CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
flowProtocol::SimpleFrame_Msg,
out_stream);
if (result == 0)
return 0;
}
TAO_SFP_Base::send_message (this->transport_,
out_stream,
frame);
}
else // larger frame,fragment and send it.
{
flags = flags | 2;
if (frame_info != 0)
{
if (frame_info->boundary_marker)
flags |= 4;
result = TAO_SFP_Base::start_frame (flags,
flowProtocol::Frame_Msg,
out_stream);
if (result == 0)
return result;
flowProtocol::my_seq_ulong source_ids;
source_ids.length (1);
source_ids [0] = 0;
TAO_SFP_Base::write_frame_message (frame_info->timestamp,
frame_info->ssrc,
source_ids,
this->sequence_num_,
out_stream);
}
else
{
CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
flowProtocol::SimpleFrame_Msg,
out_stream);
if (result == 0)
return 0;
}
size_t last_len,current_len;
int message_len = ACE_static_cast (int,
out_stream.total_length ());
ACE_Message_Block *mb = frame;
ACE_Message_Block *fragment_mb =
this->get_fragment (mb,
message_len,
last_len,
current_len);
// This can be either a simpleframe or a sequenced frame,other types of frames.
TAO_SFP_Base::send_message (this->transport_,
out_stream,
fragment_mb);
out_stream.reset ();
int frag_number = 1;
mb->length (last_len);
mb->rd_ptr (current_len);
// If there is any more data send those as fragments.
while (mb != 0)
{
message_len = TAO_SFP_Base::fragment_len;
fragment_mb = this->get_fragment (mb,
message_len,
last_len,
current_len);
if (mb == 0)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
// This is the last fragment so clear the fragments bit.
flags = TAO_ENCAP_BYTE_ORDER;
}
if (fragment_mb == 0)
break;
if (frame_info != 0)
{
TAO_SFP_Base::write_fragment_message (flags,
frag_number++,
this->sequence_num_,
frame_info->ssrc,
out_stream);
}
else
{
TAO_SFP_Base::write_fragment_message (flags,
frag_number++,
this->sequence_num_,
0,
out_stream);
}
// send the fragment now.
// without the sleep the fragments gets lost!
// probably because the UDP buffer queue on the sender side
// is overflown it drops the packets.
// XXX: This is a hack.
ACE_OS::sleep (1);
result = TAO_SFP_Base::send_message (this->transport_,
out_stream,
fragment_mb);
if (mb != 0)
{
mb->length (last_len);
mb->rd_ptr (current_len);
}
}
// Increment the sequence_num after sending the message.
this->sequence_num_++;
// Also reduce the number of credits.
if (this->max_credit_ > 0)
this->current_credit_--;
}
}
else
{
// flow controlled so wait.
// A greater than 0 value indicates that flow control is being exercised.
return 1;
}
return 0;
}
int
TAO_SFP_Object::send_frame (const iovec * /*iov*/,
int /*iovcnt*/,
TAO_AV_frame_info * /*frame_info*/)
{
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1);
}
int
TAO_SFP_Object::send_frame (const char* /*buf*/,
size_t /*len*/)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -