📄 sfp.cpp
字号:
}
}
else
{
state.more_fragments_ = 0;
state.frame_block_ = message_block;
}
if (state.more_fragments_ == 0)
{
if (fragment_entry != 0)
{
ACE_NEW_RETURN (frame_info,
TAO_AV_frame_info,
-1);
*frame_info = fragment_entry->frame_info;
}
}
return 0;
}
int
TAO_SFP_Base::read_fragment (TAO_AV_Transport *transport,
flowProtocol::fragment &fragment,
TAO_SFP_Frame_State &state,
TAO_AV_frame_info *&frame_info)
{
TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
int result = -1;
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"frag_number = %d, frag_size = %d,source_id = %d sequnce_num = %d\n",
fragment.frag_number,fragment.frag_sz,fragment.source_id,fragment.sequence_num));
ACE_Message_Block *data;
ACE_NEW_RETURN (data,
ACE_Message_Block(fragment.frag_sz),
-1);
// Read the fragment.
int n = transport->recv (data->wr_ptr (),fragment.frag_sz);
if ((n == -1) || (n==0))
ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),-1);
// move past the fragment header.
data->rd_ptr (fragment_len);
data->wr_ptr (n);
// TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n",
fragment.frag_number,
data->length ()));
TAO_SFP_Fragment_Node *new_node;
ACE_NEW_RETURN (new_node,
TAO_SFP_Fragment_Node,
-1);
new_node->fragment_info_ = fragment;
new_node->data_ = data;
TAO_SFP_Fragment_Table *fragment_table = 0;
result = state.fragment_table_map_.find (fragment.source_id,fragment_table);
if (result != 0)
{
ACE_NEW_RETURN (fragment_table,
TAO_SFP_Fragment_Table,
-1);
result = state.fragment_table_map_.bind (fragment.source_id,fragment_table);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Base::read_fragment:fragment_table_map:bind failed\n"),-1);
}
if (fragment_table->find (fragment.sequence_num,fragment_entry) == 0)
{
// Already an entry exists. Traverse the list and insert it at the right place.
result = fragment_entry->fragment_set_.insert (*new_node);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),-1);
// check if all the fragments have been received.
}
else
{
ACE_NEW_RETURN (fragment_entry,
TAO_SFP_Fragment_Table_Entry,
-1);
fragment_entry->fragment_set_.insert (*new_node);
// bind a new entry for this sequence number.
result = fragment_table->bind (fragment.sequence_num,fragment_entry);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n",
fragment.frag_number),-1);
}
if (!(fragment.flags & 0x2))
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Last fragment received\n"));
// if bit 1 is not set then there are
// no more fragments.
fragment_entry->last_received_ = 1;
// since fragment number starts from 0 to n-1 we add 1.
fragment_entry->num_fragments_ = fragment.frag_number + 1;
}
state.frame_block_ = check_all_fragments (fragment_entry);
if (state.frame_block_ != 0)
{
state.more_fragments_ = 0;
ACE_NEW_RETURN (frame_info,
TAO_AV_frame_info,
-1);
*frame_info = fragment_entry->frame_info;
}
return 0;
}
ACE_Message_Block*
TAO_SFP_Base::check_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_));
// check to see if all the frames have been received.
if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n"));
// all the fragments have been received
// we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them
// back.
ACE_Message_Block *frame = 0,*head = 0;
FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_);
TAO_SFP_Fragment_Node *node;
for (;frag_iterator.next (node) != 0;frag_iterator.advance ())
{
if (!head)
head = frame = node->data_;
else
{
frame->cont (node->data_);
frame = node->data_;
}
}
return head;
}
return 0;
}
CORBA::Boolean
TAO_SFP_Base::start_frame (CORBA::Octet flags,
flowProtocol::MsgType type,
TAO_OutputCDR &msg)
{
msg.reset ();
flowProtocol::frameHeader frame_header;
frame_header.magic_number [0] = '=';
frame_header.magic_number [1] = 'S';
frame_header.magic_number [2] = 'F';
frame_header.magic_number [3] = 'P';
frame_header.flags = flags;
frame_header.message_type = type;
frame_header.message_size = 0;
if (!(msg << frame_header))
return 0;
return 1;
}
CORBA::Boolean
TAO_SFP_Base::write_start_message (TAO_OutputCDR &msg)
{
flowProtocol::Start start;
start.magic_number [0] = '=';
start.magic_number [1] = 'S';
start.magic_number [2] = 'T';
start.magic_number [3] = 'A';
start.major_version = TAO_SFP_MAJOR_VERSION;
start.minor_version = TAO_SFP_MINOR_VERSION;
start.flags = 0;
if (!(msg << start))
return 0;
return 1;
}
CORBA::Boolean
TAO_SFP_Base::write_start_reply_message (TAO_OutputCDR &msg)
{
flowProtocol::StartReply start_reply;
start_reply.magic_number [0] = '=';
start_reply.magic_number [1] = 'S';
start_reply.magic_number [2] = 'T';
start_reply.magic_number [3] = 'R';
start_reply.flags = 0;
if (!(msg << start_reply))
return 0;
return 1;
}
CORBA::Boolean
TAO_SFP_Base::write_credit_message (CORBA::ULong cred_num,
TAO_OutputCDR &msg)
{
flowProtocol::credit credit;
credit.magic_number [0] = '=';
credit.magic_number [1] = 'C';
credit.magic_number [2] = 'R';
credit.magic_number [3] = 'E';
credit.cred_num = cred_num;
if (!(msg << credit))
return 0;
return 1;
}
CORBA::Boolean
TAO_SFP_Base::write_fragment_message (CORBA::Octet flags,
CORBA::ULong fragment_number,
CORBA::ULong sequence_number,
CORBA::ULong source_id,
TAO_OutputCDR &msg)
{
msg.reset ();
flowProtocol::fragment fragment;
fragment.magic_number [0] = 'F';
fragment.magic_number [1] = 'R';
fragment.magic_number [2] = 'A';
fragment.magic_number [3] = 'G';
fragment.flags = flags;
fragment.frag_number = fragment_number;
fragment.sequence_num = sequence_number;
fragment.source_id = source_id;
if (!(msg << fragment))
return 0;
return 1;
}
CORBA::Boolean
TAO_SFP_Base::write_frame_message (CORBA::ULong timestamp,
CORBA::ULong synchSource,
flowProtocol::my_seq_ulong source_ids,
CORBA::ULong sequence_num,
TAO_OutputCDR &msg)
{
flowProtocol::frame frame;
frame.timestamp = timestamp;
frame.synchSource = synchSource;
frame.source_ids = source_ids;
frame.sequence_num = sequence_num;
if (!(msg << frame))
return 0;
return 1;
}
int
TAO_SFP_Base::send_message (TAO_AV_Transport *transport,
TAO_OutputCDR &stream,
ACE_Message_Block *mb)
{
CORBA::ULong total_len = ACE_static_cast (CORBA::ULong,
stream.total_length ());
if (mb != 0)
{
for (ACE_Message_Block *temp = mb;temp != 0;temp = temp->cont ())
total_len += ACE_static_cast (CORBA::ULong, temp->length ());
char *buf = (char *) stream.buffer ();
size_t offset = TAO_SFP_MESSAGE_SIZE_OFFSET;
// second character distinguished =SFP and FRAG.
if (*(buf) == 'F')
{
// Fragment message.
offset = TAO_SFP_FRAGMENT_SIZE_OFFSET;
}
#if !defined (ACE_ENABLE_SWAP_ON_WRITE)
*ACE_reinterpret_cast (CORBA::ULong *, buf + offset) = total_len;
#else
if (!stream.do_byte_swap ())
*ACE_reinterpret_cast (CORBA::ULong *,
buf + offset) = total_len;
else
ACE_CDR::swap_4 (ACE_reinterpret_cast (char *,
&total_len),
buf + offset);
#endif /* ACE_ENABLE_SWAP_ON_WRITE */
}
// we join the data block with the cdr block.
ACE_Message_Block *end = (ACE_Message_Block *)stream.end ();
if (end == 0)
{
// There is only one message block.
end = (ACE_Message_Block *)stream.begin ();
// TAO_SFP_Base::dump_buf (end->rd_ptr (),end->length ());
}
end->cont (mb);
ssize_t n = transport->send (stream.begin ());
if (n == -1)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
"TAO: (%P|%t) closing conn after fault %p\n",
"GIOP::send_request ()"));
return -1;
}
// EOF.
if (n == 0)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"TAO: (%P|%t) GIOP::send_request () "
"EOF, closing conn:\n"));
return -1;
}
return 1;
}
int
TAO_SFP_Base::peek_message_type (TAO_AV_Transport *transport,
flowProtocol::MsgType &msg_type)
{
char peek_buffer [TAO_SFP_MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type.
int peek_len = TAO_SFP_MAGIC_NUMBER_LEN +2;
char magic_number [TAO_SFP_MAGIC_NUMBER_LEN+1];
ssize_t n =transport->recv (peek_buffer,
peek_len,
MSG_PEEK);
ACE_OS::strncpy (magic_number,
peek_buffer,
TAO_SFP_MAGIC_NUMBER_LEN);
magic_number [TAO_SFP_MAGIC_NUMBER_LEN] = 0;
if (n == -1)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
else if (n==0)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
msg_type = flowProtocol::Start_Msg;
}
else if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
msg_type = flowProtocol::StartReply_Msg;
}
else if (ACE_OS::strcmp (magic_number,TAO_SFP_MAGIC_NUMBER) == 0)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
// msg_type = flowProtocol::SimpleFrame;
msg_type = (flowProtocol::MsgType)peek_buffer [TAO_SFP_MESSAGE_TYPE_OFFSET];
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type));
}
else if (ACE_OS::strcmp (magic_number,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -