⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sfp.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 4 页
字号:
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
      msg_type = flowProtocol::Fragment_Msg;
    }
  else if (ACE_OS::strcmp (magic_number,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
      msg_type = flowProtocol::Credit_Msg;
    }
  else
    ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),-1);
  return 0;
}

int
TAO_SFP_Base::read_start_message (TAO_AV_Transport *transport,
                                  flowProtocol::Start &start,
                                  TAO_InputCDR &input)
{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           start_len);
  if (n != ACE_static_cast (int, start_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start\n"),0);
  else
    {
      if (!(input >> start))
        return -1;
    }
  return 0;
}


int
TAO_SFP_Base::read_start_reply_message (TAO_AV_Transport *transport,
                                        flowProtocol::StartReply &start_reply,
                                        TAO_InputCDR &input)
{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           start_reply_len);
  if (n != ACE_static_cast (int, start_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_start_reply_message"),0);
  else
    {
      if (!(input >> start_reply))
        return -1;
    }
  return 0;
}

int
TAO_SFP_Base::read_credit_message (TAO_AV_Transport *transport,
                                   flowProtocol::credit &credit,
                                   TAO_InputCDR &input)
{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           credit_len);
  if (n != ACE_static_cast (int, credit_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_credit_message"),0);
  else
    {
      if (!(input >> credit))
        return -1;
    }
  return 0;
}

int
TAO_SFP_Base::read_endofstream_message (TAO_AV_Transport *transport,
                                        flowProtocol::frameHeader &endofstream,
                                        TAO_InputCDR &input)
{
  input.grow (start_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           frame_header_len);
  if (n != ACE_static_cast (int, frame_header_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
  else
    {
      if (!(input >> endofstream))
        return -1;
    }
  return 0;
}

int
TAO_SFP_Base::peek_frame_header (TAO_AV_Transport *transport,
                                 flowProtocol::frameHeader &header,
                                 TAO_InputCDR &input)
{
  input.grow (frame_header_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           frame_header_len,
                           MSG_PEEK);
  if (n != ACE_static_cast (int, frame_header_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
  else
    {
      if (!(input >> header))
        return -1;
    }
  return 0;
}

int
TAO_SFP_Base::peek_fragment_header (TAO_AV_Transport *transport,
                                    flowProtocol::fragment &fragment,
                                    TAO_InputCDR &input)
{
  input.grow (fragment_len);
  char *buf = input.rd_ptr ();
  int n = transport->recv (buf,
                           fragment_len,
                           MSG_PEEK);
  if (n != ACE_static_cast (int, fragment_len))
    ACE_ERROR_RETURN ((LM_ERROR,"%p","TAO_SFP_Base::read_endofstream_message"),0);
  else
    {
      if (!(input >> fragment))
        return -1;
    }
  return 0;
}

void
TAO_SFP_Base::dump_buf (char *buffer,int size)
{
  char *buf = buffer;
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
  for (int i=0;i<size;i++)
    if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"%d ",buf[i]));
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"\n========================================\n"));
}

//------------------------------------------------------------
// TAO_SFP_Object
//------------------------------------------------------------

TAO_SFP_Object::TAO_SFP_Object (TAO_AV_Callback *callback,
                                TAO_AV_Transport *transport)
  :TAO_AV_Protocol_Object (callback,transport),
   source_id_ (10),
   max_credit_ (-1),
   current_credit_ (-1)
{
  TAO_SFP_BASE::instance ();
  this->state_.static_frame_.size (2* this->transport_->mtu ());
}

TAO_SFP_Object::~TAO_SFP_Object (void)
{
  //no-op
}

int
TAO_SFP_Object::destroy (void)
{
  int result = -1;
  TAO_OutputCDR out_stream;
  result = TAO_SFP_Base::start_frame (TAO_ENCAP_BYTE_ORDER,
                                      flowProtocol::EndofStream_Msg,
                                      out_stream);
  if (result < 0)
    return result;
  result = TAO_SFP_Base::send_message (this->transport_,
                                       out_stream);
  if (result < 0)
    return result;
  this->callback_->handle_destroy ();
  return 0;
}

int
TAO_SFP_Object::send_frame (ACE_Message_Block *frame,
                            TAO_AV_frame_info *frame_info)
{
  TAO_OutputCDR out_stream;
  CORBA::Boolean result = 0;
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Object::send_frame\n"));
  CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
  if (this->transport_ == 0)
    ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
   if (this->current_credit_ != 0)
    {
      // if we have enough credit then we send.
      size_t total_length = 0;
      for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
        total_length += temp->length ();
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
      if (total_length < (TAO_SFP_MAX_PACKET_SIZE -TAO_SFP_Base::frame_header_len))
        {
          if (frame_info != 0)
            {
              if (frame_info->boundary_marker)
                flags |= 4;
              CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
                                                                 flowProtocol::Frame_Msg,
                                                                 out_stream);
              if (result == 0)
                return 0;
              flowProtocol::my_seq_ulong source_ids;
              source_ids.length (1);
              source_ids [0] = 0;
              TAO_SFP_Base::write_frame_message (frame_info->timestamp,
                                                 frame_info->ssrc,
                                                 source_ids,
                                                 this->sequence_num_,
                                                 out_stream);
            }
          else
            {
              CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
                                                                 flowProtocol::SimpleFrame_Msg,
                                                                 out_stream);
              if (result == 0)
                return 0;
            }
          TAO_SFP_Base::send_message (this->transport_,
                                      out_stream,
                                      frame);
        }
      else // larger frame,fragment and send it.
        {
          flags = flags | 2;
          if (frame_info != 0)
            {
              if (frame_info->boundary_marker)
                flags |= 4;
              result = TAO_SFP_Base::start_frame (flags,
                                                  flowProtocol::Frame_Msg,
                                                  out_stream);
              if (result == 0)
                return result;
              flowProtocol::my_seq_ulong source_ids;
              source_ids.length (1);
              source_ids [0] = 0;
              TAO_SFP_Base::write_frame_message (frame_info->timestamp,
                                                 frame_info->ssrc,
                                                 source_ids,
                                                 this->sequence_num_,
                                                 out_stream);
            }
          else
            {
              CORBA::Boolean result = TAO_SFP_Base::start_frame (flags,
                                                                 flowProtocol::SimpleFrame_Msg,
                                                                 out_stream);
              if (result == 0)
                return 0;
            }
          size_t last_len,current_len;
          int message_len = ACE_static_cast (int,
                                             out_stream.total_length ());
          ACE_Message_Block *mb = frame;
          ACE_Message_Block *fragment_mb =
            this->get_fragment (mb,
                                message_len,
                                last_len,
                                current_len);
          //  This can be either a simpleframe or a sequenced frame,other types of frames.
          TAO_SFP_Base::send_message (this->transport_,
                                      out_stream,
                                      fragment_mb);
          out_stream.reset ();
          int frag_number = 1;
          mb->length (last_len);
          mb->rd_ptr (current_len);
          // If there is any more data send those as fragments.
          while (mb != 0)
            {
              message_len = TAO_SFP_Base::fragment_len;
              fragment_mb = this->get_fragment (mb,
                                                message_len,
                                                last_len,
                                                current_len);
              if (mb == 0)
                {
                  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
                  // This is the last fragment so clear the fragments bit.
                  flags = TAO_ENCAP_BYTE_ORDER;
                }
              if (fragment_mb == 0)
                break;
              if (frame_info != 0)
                {
                  TAO_SFP_Base::write_fragment_message (flags,
                                                        frag_number++,
                                                        this->sequence_num_,
                                                        frame_info->ssrc,
                                                        out_stream);
                }
              else
                {
                  TAO_SFP_Base::write_fragment_message (flags,
                                                        frag_number++,
                                                        this->sequence_num_,
                                                        0,
                                                        out_stream);
                }
              //   send the fragment now.
              // without the sleep the fragments gets lost!
              // probably because the UDP buffer queue on the sender side
              // is overflown it drops the packets.
              // XXX: This is a hack.
              ACE_OS::sleep (1);
              result = TAO_SFP_Base::send_message (this->transport_,
                                                   out_stream,
                                                   fragment_mb);
              if (mb != 0)
                {
                  mb->length (last_len);
                  mb->rd_ptr (current_len);
                }
            }
          // Increment the sequence_num after sending the message.
          this->sequence_num_++;
          // Also reduce the number of credits.
          if (this->max_credit_ > 0)
            this->current_credit_--;
        }
    }
  else
    {
      // flow controlled so wait.
      // A greater than 0 value indicates that flow control is being exercised.
      return 1;
    }
   return 0;
}

int
TAO_SFP_Object::send_frame (const iovec * /*iov*/,
                            int /*iovcnt*/,
                            TAO_AV_frame_info * /*frame_info*/)
{
  ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SFP_Object::send_frame"),-1);
}

int
TAO_SFP_Object::send_frame (const char* /*buf*/,
                               size_t /*len*/)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -