📄 sfp.cpp
字号:
// sfp.cpp,v 5.40 2003/11/04 05:21:31 dhinton Exp
#include "sfp.h"
#include "ace/ARGV.h"
#include "tao/debug.h"
#include "ace/OS_NS_strings.h"
// default arguments to pass to use for the ORB
const char *TAO_SFP_Base::TAO_SFP_ORB_ARGUMENTS = "-ORBobjrefstyle URL";
// SFP magic numbers
const char *TAO_SFP_Base::TAO_SFP_MAGIC_NUMBER = "=SFP";
const char *TAO_SFP_Base::TAO_SFP_FRAGMENT_MAGIC_NUMBER = "FRAG";
const char *TAO_SFP_Base::TAO_SFP_START_MAGIC_NUMBER = "=STA";
const char *TAO_SFP_Base::TAO_SFP_CREDIT_MAGIC_NUMBER = "=CRE";
const char *TAO_SFP_Base::TAO_SFP_STARTREPLY_MAGIC_NUMBER = "=STR";
// SFP version 1.0
const unsigned char TAO_SFP_Base::TAO_SFP_MAJOR_VERSION = 1;
const unsigned char TAO_SFP_Base::TAO_SFP_MINOR_VERSION = 0;
// lengths of various SFP headers
const unsigned char TAO_SFP_Base::TAO_SFP_FRAME_HEADER_LEN = 12;
const unsigned char TAO_SFP_Base::TAO_SFP_MESSAGE_SIZE_OFFSET = 8;
const unsigned char TAO_SFP_Base::TAO_SFP_FRAGMENT_SIZE_OFFSET = 16;
u_int TAO_SFP_Base::frame_header_len;
u_int TAO_SFP_Base::start_reply_len;
u_int TAO_SFP_Base::start_len;
u_int TAO_SFP_Base::credit_len;
u_int TAO_SFP_Base::fragment_len;
int
operator< (const TAO_SFP_Fragment_Node& left,
const TAO_SFP_Fragment_Node& right)
{
return left.fragment_info_.frag_number < right.fragment_info_.frag_number;
}
//------------------------------------------------------------
// TAO_SFP_Base
//------------------------------------------------------------
TAO_SFP_Base::TAO_SFP_Base (void)
{
TAO_OutputCDR output_cdr;
flowProtocol::frameHeader frame_header;
flowProtocol::fragment fragment;
flowProtocol::credit credit;
flowProtocol::Start start;
flowProtocol::StartReply start_reply;
// fill in the default frameHeader fields.
frame_header.magic_number [0] = '=';
frame_header.magic_number [1] = 'S';
frame_header.magic_number [2] = 'F';
frame_header.magic_number [3] = 'P';
frame_header.flags = TAO_ENCAP_BYTE_ORDER;
output_cdr.reset ();
if (!(output_cdr << frame_header))
{
ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
return;
}
frame_header_len = ACE_static_cast (u_int, output_cdr.total_length ());
// fill in the default fragment message fields.
fragment.magic_number [0] = 'F';
fragment.magic_number [1] = 'R';
fragment.magic_number [2] = 'A';
fragment.magic_number [3] = 'G';
output_cdr.reset ();
if (!(output_cdr << fragment))
{
ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
return;
}
fragment_len = ACE_static_cast (u_int, output_cdr.total_length ());
// fill in the default Start message fields.
start.magic_number [0] = '=';
start.magic_number [1] = 'S';
start.magic_number [2] = 'T';
start.magic_number [3] = 'A';
start.major_version = TAO_SFP_Base::TAO_SFP_MAJOR_VERSION;
start.minor_version = TAO_SFP_Base::TAO_SFP_MINOR_VERSION;
start.flags = 0;
output_cdr.reset ();
if (!(output_cdr << start))
{
ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
return;
}
start_len = ACE_static_cast (u_int, output_cdr.total_length ());
// fill in the default StartReply message fields.
start_reply.magic_number [0] = '=';
start_reply.magic_number [1] = 'S';
start_reply.magic_number [2] = 'T';
start_reply.magic_number [3] = 'R';
start_reply.flags = 0;
output_cdr.reset ();
if (!(output_cdr << start_reply))
{
ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
return;
}
start_reply_len = ACE_static_cast (u_int, output_cdr.total_length ());
// fill in the default Credit message fields.
credit.magic_number [0] = '=';
credit.magic_number [1] = 'C';
credit.magic_number [2] = 'R';
credit.magic_number [3] = 'E';
output_cdr.reset ();
if (!(output_cdr << credit))
{
ACE_ERROR ((LM_ERROR, "TAO_SFP constructor\n"));
return;
}
credit_len = ACE_static_cast (u_int, output_cdr.total_length ());
}
int
TAO_SFP_Base::handle_input (TAO_AV_Transport *transport,
TAO_SFP_Frame_State &state,
TAO_AV_frame_info *&frame_info)
{
int result;
flowProtocol::MsgType msg_type;
result = TAO_SFP_Base::peek_message_type (transport,
msg_type);
if (result < 0)
return result;
// TAO_InputCDR &input = state.cdr;
switch (msg_type)
{
case flowProtocol::SimpleFrame_Msg:
case flowProtocol::Frame_Msg:
{
result = TAO_SFP_Base::peek_frame_header (transport,
state.frame_header_,
state.cdr);
if (result < 0)
return result;
int result =TAO_SFP_Base::read_frame (transport,
state.frame_header_,
state,
frame_info);
if (result < 0)
return result;
break;
}
case flowProtocol::Fragment_Msg:
{
result = TAO_SFP_Base::peek_fragment_header (transport,
state.fragment_,
state.cdr);
if (result < 0)
return result;
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Fragment received\n"));
result = TAO_SFP_Base::read_fragment (transport,
state.fragment_,
state,
frame_info);
if (result < 0)
return result;
break;
}
case flowProtocol::EndofStream_Msg:
{
result = TAO_SFP_Base::read_endofstream_message (transport,
state.frame_header_,
state.cdr);
if (result < 0)
return result;
break;
}
default:
break;
}
return 0;
}
int
TAO_SFP_Base::read_frame (TAO_AV_Transport *transport,
flowProtocol::frameHeader &frame_header,
TAO_SFP_Frame_State &state,
TAO_AV_frame_info *&frame_info)
{
ACE_Message_Block *message_block = 0;
int result = -1;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n"));
// Check to see what the length of the message is.
int byte_order = frame_header.flags & 0x1;
int message_len = frame_header.message_size;
// ACE_NEW_RETURN (message_block,
// ACE_Message_Block (message_len),
// 0);
state.static_frame_.rd_ptr (state.static_frame_.base ());
state.static_frame_.wr_ptr (state.static_frame_.base ());
int n = transport->recv (state.static_frame_.rd_ptr (),message_len);
if (n == -1)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
else if (n==0)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
else if (n != message_len)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0);
message_block = &state.static_frame_;
// print the buffer.
// this->dump_buf (message,n);
// skip over the frame header.
message_block->rd_ptr (frame_header_len);
message_block->wr_ptr (n);
CORBA::ULong ssrc = 0;
TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
if (frame_header.flags & 0x2)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,"fragmented frame:0th fragment\n"));
state.more_fragments_ = 1;
ACE_Message_Block *data = 0;
switch (frame_header.message_type)
{
case flowProtocol::Frame_Msg:
{
// read the frame info.
ACE_Message_Block frame_info_mb (message_len-frame_header_len+ACE_CDR::MAX_ALIGNMENT);
ACE_CDR::mb_align (&frame_info_mb);
frame_info_mb.copy (message_block->rd_ptr (),
message_block->length ());
// print the buffer.
// this->dump_buf (message_block->rd_ptr (),16);
TAO_InputCDR frame_info_cdr (&frame_info_mb,byte_order);
frame_info_cdr >> state.frame_;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"frame.timestamp = %d, "
"frame.synchsource = %d, "
"frame.sequence_num = %d\n",
state.frame_.timestamp,
state.frame_.synchSource,
state.frame_.sequence_num));
ssrc = state.frame_.synchSource;
// The remaining message in the CDR stream is the fragment
// data for frag.0
data = frame_info_cdr.start ()->clone ();
break;
}
case flowProtocol::SimpleFrame_Msg:
{
data = message_block->clone ();
break;
}
case flowProtocol::SequencedFrame_Msg:
break;
case flowProtocol::SpecialFrame_Msg:
break;
}
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,"Length of 0th fragment= %d\n",data->length ()));
TAO_SFP_Fragment_Table *fragment_table = 0;
result = state.fragment_table_map_.find (ssrc,fragment_table);
if (result != 0)
{
ACE_NEW_RETURN (fragment_table,
TAO_SFP_Fragment_Table,
-1);
result = state.fragment_table_map_.bind (ssrc,fragment_table);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
"TAO_SFP_Base::read_frame: "
"fragment_table_map:bind failed\n"),-1);
}
TAO_SFP_Fragment_Node *new_node;
ACE_NEW_RETURN (new_node,
TAO_SFP_Fragment_Node,
0);
new_node->fragment_info_.frag_sz = ACE_static_cast (CORBA::ULong,
data->length ());
new_node->fragment_info_.frag_number = 0;
if (state.frame_.source_ids.length () > 0)
new_node->fragment_info_.source_id = state.frame_.source_ids [0];
else
new_node->fragment_info_.source_id = 0;
new_node->data_ = data;
// TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
if (fragment_table->find (state.frame_.sequence_num,fragment_entry) == 0)
{
// This case can happen where a nth (n > 0)fragment is
// received before the 0th fragment.
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"fragment table entry found for 0th fragment:\n"));
result = fragment_entry->fragment_set_.insert (*new_node);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"insert for 0th fragment failed\n"),0);
// enter the frame info.
// check if all the fragments have been received.
state.frame_block_ =
TAO_SFP_Base::check_all_fragments (fragment_entry);
if (state.frame_block_ != 0)
state.more_fragments_ = 0;
}
else
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"fragment table entry not found for 0th fragment\n"));
TAO_SFP_Fragment_Table_Entry *new_entry;
ACE_NEW_RETURN (new_entry,
TAO_SFP_Fragment_Table_Entry,
0);
result = new_entry->fragment_set_.insert (*new_node);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0);
fragment_entry = new_entry;
// not found. so bind a new entry.
result = fragment_table->bind (state.frame_.sequence_num,new_entry);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,"fragment table bind failed\n"),0);
if (frame_header.message_type & 4 )
fragment_entry->frame_info.boundary_marker = 1;
switch (frame_header.message_type)
{
case flowProtocol::Frame_Msg:
fragment_entry->frame_info.ssrc = state.frame_.synchSource;
fragment_entry->frame_info.timestamp = state.frame_.timestamp;
fragment_entry->frame_info.sequence_num = state.frame_.sequence_num;
break;
case flowProtocol::SimpleFrame_Msg:
fragment_entry->frame_info.ssrc =
fragment_entry->frame_info.timestamp =
fragment_entry->frame_info.sequence_num = 0;
break;
}
return 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -