⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sfp.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 4 页
字号:
        }
    }
  else
    {
      state.more_fragments_ = 0;
      state.frame_block_ = message_block;
    }
  if (state.more_fragments_ == 0)
    {
      if (fragment_entry != 0)
        {
          ACE_NEW_RETURN (frame_info,
                          TAO_AV_frame_info,
                          -1);
          *frame_info = fragment_entry->frame_info;
        }
    }
  return 0;
}

int
TAO_SFP_Base::read_fragment (TAO_AV_Transport *transport,
                             flowProtocol::fragment &fragment,
                             TAO_SFP_Frame_State &state,
                             TAO_AV_frame_info *&frame_info)
{
  TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
  int result = -1;

  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"frag_number = %d, frag_size = %d,source_id  = %d sequnce_num = %d\n",
              fragment.frag_number,fragment.frag_sz,fragment.source_id,fragment.sequence_num));

  ACE_Message_Block *data;
  ACE_NEW_RETURN (data,
                  ACE_Message_Block(fragment.frag_sz),
                  -1);

  // Read the fragment.
  int n = transport->recv (data->wr_ptr (),fragment.frag_sz);
  if ((n == -1) || (n==0))
    ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),-1);
  // move past the fragment header.
  data->rd_ptr (fragment_len);
  data->wr_ptr (n);
  //  TAO_SFP_Base::dump_buf (data->rd_ptr (),data->length ());
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n",
              fragment.frag_number,
              data->length ()));

  TAO_SFP_Fragment_Node *new_node;
  ACE_NEW_RETURN (new_node,
                  TAO_SFP_Fragment_Node,
                  -1);
  new_node->fragment_info_ = fragment;
  new_node->data_ = data;
  TAO_SFP_Fragment_Table *fragment_table = 0;
  result = state.fragment_table_map_.find (fragment.source_id,fragment_table);
  if (result != 0)
    {
      ACE_NEW_RETURN (fragment_table,
                      TAO_SFP_Fragment_Table,
                      -1);
      result = state.fragment_table_map_.bind (fragment.source_id,fragment_table);
      if (result < 0)
        ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Base::read_fragment:fragment_table_map:bind failed\n"),-1);
    }
  if (fragment_table->find (fragment.sequence_num,fragment_entry) == 0)
    {
      // Already an entry exists. Traverse the list and insert it at the right place.
      result = fragment_entry->fragment_set_.insert (*new_node);
      if (result != 0)
        ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),-1);
      // check if all the fragments have been received.
    }
  else
    {
      ACE_NEW_RETURN (fragment_entry,
                      TAO_SFP_Fragment_Table_Entry,
                      -1);
      fragment_entry->fragment_set_.insert (*new_node);
      // bind a new entry for this sequence number.
      result = fragment_table->bind (fragment.sequence_num,fragment_entry);
      if (result != 0)
        ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n",
                           fragment.frag_number),-1);
    }
  if (!(fragment.flags & 0x2))
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Last fragment received\n"));
      // if bit 1 is not set then there are
      // no more fragments.
      fragment_entry->last_received_ = 1;
      // since fragment number starts from 0 to n-1 we add 1.
      fragment_entry->num_fragments_ = fragment.frag_number + 1;
    }


  state.frame_block_ = check_all_fragments (fragment_entry);
  if (state.frame_block_ != 0)
    {
      state.more_fragments_ = 0;
      ACE_NEW_RETURN (frame_info,
                      TAO_AV_frame_info,
                      -1);
      *frame_info = fragment_entry->frame_info;
    }
  return 0;
}

ACE_Message_Block*
TAO_SFP_Base::check_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry)
{
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_));
  // check to see if all the frames have been received.
  if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n"));
      // all the fragments have been received
      // we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them
      // back.
      ACE_Message_Block *frame = 0,*head = 0;
      FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_);
      TAO_SFP_Fragment_Node *node;
      for (;frag_iterator.next (node) != 0;frag_iterator.advance ())
        {
          if (!head)
            head = frame = node->data_;
          else
            {
              frame->cont (node->data_);
              frame = node->data_;
            }
        }
      return head;
    }
  return 0;
}

CORBA::Boolean
TAO_SFP_Base::start_frame (CORBA::Octet flags,
                           flowProtocol::MsgType type,
                           TAO_OutputCDR &msg)
{
  msg.reset ();
  flowProtocol::frameHeader frame_header;

  frame_header.magic_number [0] = '=';
  frame_header.magic_number [1] = 'S';
  frame_header.magic_number [2] = 'F';
  frame_header.magic_number [3] = 'P';
  frame_header.flags = flags;
  frame_header.message_type = type;
  frame_header.message_size = 0;
  if (!(msg << frame_header))
    return 0;
  return 1;
}

CORBA::Boolean
TAO_SFP_Base::write_start_message (TAO_OutputCDR &msg)
{
  flowProtocol::Start start;

  start.magic_number [0] = '=';
  start.magic_number [1] = 'S';
  start.magic_number [2] = 'T';
  start.magic_number [3] = 'A';
  start.major_version = TAO_SFP_MAJOR_VERSION;
  start.minor_version = TAO_SFP_MINOR_VERSION;
  start.flags = 0;
  if (!(msg << start))
    return 0;
  return 1;
}

CORBA::Boolean
TAO_SFP_Base::write_start_reply_message (TAO_OutputCDR &msg)
{
  flowProtocol::StartReply start_reply;

  start_reply.magic_number [0] = '=';
  start_reply.magic_number [1] = 'S';
  start_reply.magic_number [2] = 'T';
  start_reply.magic_number [3] = 'R';
  start_reply.flags = 0;
  if (!(msg << start_reply))
    return 0;
  return 1;
}

CORBA::Boolean
TAO_SFP_Base::write_credit_message (CORBA::ULong cred_num,
                                    TAO_OutputCDR &msg)
{
  flowProtocol::credit credit;

  credit.magic_number [0] = '=';
  credit.magic_number [1] = 'C';
  credit.magic_number [2] = 'R';
  credit.magic_number [3] = 'E';
  credit.cred_num = cred_num;
  if (!(msg << credit))
    return 0;
  return 1;
}

CORBA::Boolean
TAO_SFP_Base::write_fragment_message (CORBA::Octet flags,
                                      CORBA::ULong fragment_number,
                                      CORBA::ULong sequence_number,
                                      CORBA::ULong source_id,
                                      TAO_OutputCDR &msg)
{
  msg.reset ();
  flowProtocol::fragment fragment;

  fragment.magic_number [0] = 'F';
  fragment.magic_number [1] = 'R';
  fragment.magic_number [2] = 'A';
  fragment.magic_number [3] = 'G';
  fragment.flags = flags;
  fragment.frag_number = fragment_number;
  fragment.sequence_num = sequence_number;
  fragment.source_id = source_id;
  if (!(msg << fragment))
    return 0;
  return 1;
}

CORBA::Boolean
TAO_SFP_Base::write_frame_message (CORBA::ULong timestamp,
                                   CORBA::ULong synchSource,
                                   flowProtocol::my_seq_ulong source_ids,
                                   CORBA::ULong sequence_num,
                                   TAO_OutputCDR &msg)
{
  flowProtocol::frame frame;

  frame.timestamp = timestamp;
  frame.synchSource = synchSource;
  frame.source_ids = source_ids;
  frame.sequence_num = sequence_num;
  if (!(msg << frame))
    return 0;
  return 1;
}

int
TAO_SFP_Base::send_message (TAO_AV_Transport *transport,
                            TAO_OutputCDR &stream,
                            ACE_Message_Block *mb)
{
  CORBA::ULong total_len = ACE_static_cast (CORBA::ULong,
                                            stream.total_length ());
  if (mb != 0)
    {
      for (ACE_Message_Block *temp = mb;temp != 0;temp = temp->cont ())
        total_len += ACE_static_cast (CORBA::ULong, temp->length ());

      char *buf = (char *) stream.buffer ();
      size_t offset = TAO_SFP_MESSAGE_SIZE_OFFSET;
      // second character distinguished =SFP and FRAG.
      if (*(buf) == 'F')
        {
          // Fragment message.
          offset = TAO_SFP_FRAGMENT_SIZE_OFFSET;
        }
#if !defined (ACE_ENABLE_SWAP_ON_WRITE)
      *ACE_reinterpret_cast (CORBA::ULong *, buf + offset) = total_len;
#else
      if (!stream.do_byte_swap ())
        *ACE_reinterpret_cast (CORBA::ULong *,
                               buf + offset) = total_len;
      else
        ACE_CDR::swap_4 (ACE_reinterpret_cast (char *,
                                               &total_len),
                         buf + offset);
#endif /* ACE_ENABLE_SWAP_ON_WRITE */
    }
  // we join the data block with the cdr block.
  ACE_Message_Block *end = (ACE_Message_Block *)stream.end ();
  if (end == 0)
    {
      // There is only one message block.
      end = (ACE_Message_Block *)stream.begin ();
      //      TAO_SFP_Base::dump_buf (end->rd_ptr (),end->length ());
    }
  end->cont (mb);
  ssize_t n = transport->send (stream.begin ());
  if (n == -1)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
                  "TAO: (%P|%t) closing conn after fault %p\n",
                  "GIOP::send_request ()"));
      return -1;
    }
  // EOF.
  if (n == 0)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,
                    "TAO: (%P|%t) GIOP::send_request () "
                    "EOF, closing conn:\n"));
      return -1;
    }
  return 1;

}

int
TAO_SFP_Base::peek_message_type (TAO_AV_Transport *transport,
                                 flowProtocol::MsgType &msg_type)
{
  char peek_buffer [TAO_SFP_MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type.
  int peek_len = TAO_SFP_MAGIC_NUMBER_LEN +2;
  char magic_number [TAO_SFP_MAGIC_NUMBER_LEN+1];
  ssize_t n =transport->recv (peek_buffer,
                              peek_len,
                              MSG_PEEK);
  ACE_OS::strncpy (magic_number,
                   peek_buffer,
                   TAO_SFP_MAGIC_NUMBER_LEN);
  magic_number [TAO_SFP_MAGIC_NUMBER_LEN] = 0;
  if (n == -1)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
  else if (n==0)
    ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);

  if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
      msg_type = flowProtocol::Start_Msg;
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
      msg_type = flowProtocol::StartReply_Msg;
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
      //      msg_type = flowProtocol::SimpleFrame;
      msg_type = (flowProtocol::MsgType)peek_buffer [TAO_SFP_MESSAGE_TYPE_OFFSET];
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type));
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
    {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -