⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ec_mcast.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 3 页
字号:
                      argv[0]));
          return -1;
        }
    }

  if (this->event_count_ < 0
      || this->event_count_ >= ECM_Driver::MAX_EVENTS)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "%s: event count (%d) is out of range, "
                  "reset to default (%d)\n",
                  argv[0], this->event_count_,
                  100));
      this->event_count_ = 100;
    }

  return 0;
}

int
ECM_Driver::parse_config_file (void)
{
  FILE* cfg = 0;
  if (this->config_filename_ != 0)
    cfg = ACE_OS::fopen (this->config_filename_, "r");
  else
    cfg = stdin;

  if (cfg == 0)
    {
      ACE_ERROR_RETURN ((LM_ERROR, "cannot open config file <%s>\n",
                         this->config_filename_), -1);
    }

  int s = fscanf (cfg, "%d", &this->all_federations_count_);
  if (s == 0 || s == EOF)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "problem reading federation count\n"), -1);
    }
  // ACE_DEBUG ((LM_DEBUG,
  // "total federations = %d\n",
  // this->all_federations_count_));
  for (int i = 0; i < this->all_federations_count_; ++i)
    {
      if (this->skip_blanks (cfg, "reading federation name"))
        return -1;
      ACE_Read_Buffer reader(cfg);
      char* buf = reader.read (' ', ' ', '\0');
      char* name = CORBA::string_dup (buf);
      reader.alloc()->free (buf);


      int port;
      if (this->skip_blanks (cfg, "reading federation port number"))
        return -1;
      fscanf (cfg, "%d", &port);
      CORBA::UShort mcast_port = ACE_static_cast(CORBA::UShort, port);

      int ns, nc;
      if (this->skip_blanks (cfg, "reading supplier count"))
        return -1;
      s = fscanf (cfg, "%d", &ns);
      if (s == 0 || s == EOF)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "problem reading supplier count (%d)\n",
                             i), -1);
        }
      if (this->skip_blanks (cfg, "reading constumer count"))
        return -1;
      s = fscanf (cfg, "%d", &nc);
      if (s == 0 || s == EOF)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "problem reading consumer count (%d)\n",
                             i), -1);
        }
      // ACE_DEBUG ((LM_DEBUG, "i = %d <%s> <%d> <%d> <%d>\n",
      // i, name, mcast_port, ns, nc));

      char** supplier_names;
      char** consumer_names;
      ACE_NEW_RETURN (supplier_names, char*[ns], -1);
      ACE_NEW_RETURN (consumer_names, char*[nc], -1);

      if (this->parse_name_list (cfg, ns, supplier_names,
                                 "reading supplier list"))
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "error parsing supplier list for <%s>\n",
                             name), -1);
        }

      if (this->parse_name_list (cfg, nc, consumer_names,
                                 "reading consumer list"))
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "error parsing consumer list for <%s>\n",
                             name), -1);
        }

      ACE_NEW_RETURN (this->all_federations_[i],
                      ECM_Federation (name, mcast_port,
                                      ns, supplier_names,
                                      nc, consumer_names), -1);
    }
  ACE_OS::fclose (cfg);

  for (int j = 0; j < this->local_federations_count_; ++j)
    {
      int k = 0;
      for (; k < this->all_federations_count_; ++k)
        {
          if (ACE_OS::strcmp (this->local_names_[j],
                              this->all_federations_[k]->name ()) == 0)
            {
              ACE_NEW_RETURN (this->local_federations_[j],
                              ECM_Local_Federation (this->all_federations_[k],
                                                    this),
                              -1);
              break;
            }
        }
      if (k == this->all_federations_count_)
        ACE_ERROR ((LM_ERROR,
                    "Cannot find federations <%s>\n",
                    this->local_names_[j]));
    }

  return 0;
}

int
ECM_Driver::parse_name_list (FILE* file,
                             int n,
                             char** names,
                             const char* error_msg)
{
  for (int i = 0; i < n; ++i)
    {
      if (this->skip_blanks (file, error_msg))
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "error on item %d while %s\n",
                             i, error_msg), -1);
        }
      ACE_Read_Buffer tmp(file);
      char* buf = tmp.read ('\n', '\n', '\0');
      names[i] = CORBA::string_dup (buf);
      tmp.alloc ()->free (buf);
    }
  return 0;
}

int
ECM_Driver::skip_blanks (FILE* file,
                         const char* error_msg)
{
  int c;
  // Consume all the blanks.
  while (isspace (c = fgetc (file)));
  if (c == EOF)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Unexpected EOF in config file while %s\n",
                         error_msg),
                        -1);
    }
  ungetc (c, file);
  return 0;
}
// ****************************************************************

ECM_Federation::ECM_Federation (char* name,
                                CORBA::UShort mcast_port,
                                int supplier_types,
                                char** supplier_names,
                                int consumer_types,
                                char** consumer_names)
  :  name_ (name),
     mcast_port_ (mcast_port),
     supplier_types_ (supplier_types),
     supplier_names_ (supplier_names),
     consumer_types_ (consumer_types),
     consumer_names_ (consumer_names),
     addr_server_ (mcast_port)
{
  sender_ = TAO_ECG_UDP_Sender::create (true);

  ACE_NEW (this->supplier_ipaddr_, CORBA::ULong[this->supplier_types_]);
  ACE_NEW (this->consumer_ipaddr_, CORBA::ULong[this->consumer_types_]);

  int i;
  for (i = 0; i < this->supplier_types_; ++i)
    {
      ACE_INET_Addr addr (u_short(0), this->supplier_names_[i]);
      this->supplier_ipaddr_[i] = addr.get_ip_address ();
    }
  for (i = 0; i < this->consumer_types_; ++i)
    {
      ACE_INET_Addr addr (u_short(0), this->consumer_names_[i]);
      this->consumer_ipaddr_[i] = addr.get_ip_address ();
    }
}

ECM_Federation::~ECM_Federation (void)
{
  delete[] this->consumer_ipaddr_;
  delete[] this->supplier_ipaddr_;
}

void
ECM_Federation::open (TAO_ECG_UDP_Out_Endpoint *endpoint,
                      RtecEventChannelAdmin::EventChannel_ptr ec
                      ACE_ENV_ARG_DECL)
{
  RtecUDPAdmin::AddrServer_var addr_server =
    this->addr_server (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->sender_->init (ec,
                       addr_server.in (),
                       endpoint
                       ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // @@ TODO Make this a parameter....
  this->sender_->mtu (64);

  // The worst case execution time is far less than 2
  // milliseconds, but that is a safe estimate....
  ACE_Time_Value tv (0, 2000);
  TimeBase::TimeT time;
  ORBSVCS_Time::Time_Value_to_TimeT (time, tv);

  ACE_ConsumerQOS_Factory qos;
  qos.start_disjunction_group ();
  for (int i = 0; i < this->consumer_types (); ++i)
    {
      qos.insert_type (this->consumer_ipaddr (i), 0);
    }
  RtecEventChannelAdmin::ConsumerQOS qos_copy = qos.get_ConsumerQOS ();
  this->sender_->connect (qos_copy ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
ECM_Federation::close (ACE_ENV_SINGLE_ARG_DECL)
{
  this->sender_->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}

RtecUDPAdmin::AddrServer_ptr
ECM_Federation::addr_server (ACE_ENV_SINGLE_ARG_DECL)
{
  return this->addr_server_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
}

// ****************************************************************

ECM_Supplier::ECM_Supplier (ECM_Local_Federation* federation)
  :  federation_ (federation),
     consumer_ (this)
{
}

void
ECM_Supplier::open (const char* name,
                    RtecEventChannelAdmin::EventChannel_ptr ec
                    ACE_ENV_ARG_DECL)
{
  this->supplier_id_ = ACE::crc32 (name);
  ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
              this->supplier_id_));

  ACE_SupplierQOS_Factory qos;
  for (int i = 0; i < this->federation_->supplier_types (); ++i)
    {
      qos.insert (this->supplier_id_,
                  this->federation_->supplier_ipaddr (i),
                  0, 1);
    }
  qos.insert (this->supplier_id_,
              ACE_ES_EVENT_SHUTDOWN,
              0, 1);

  RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
    ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_proxy_ =
    supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventComm::PushSupplier_var objref = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_proxy_->connect_push_supplier (objref.in (),
                                                qos.get_SupplierQOS ()
                                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
ECM_Supplier::close (ACE_ENV_SINGLE_ARG_DECL)
{
  if (CORBA::is_nil (this->consumer_proxy_.in ()))
    return;

  this->consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_proxy_ = 0;
}

void
ECM_Supplier::activate (RtecEventChannelAdmin::EventChannel_ptr ec,
                        RtecEventComm::Time interval
                        ACE_ENV_ARG_DECL)
{
  ACE_ConsumerQOS_Factory consumer_qos;
  consumer_qos.start_disjunction_group ();
  consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
                            interval,
                            0);

  // = Connect as a consumer.
  RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
    ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_proxy_ =
    consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventComm::PushConsumer_var cref =
    this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_proxy_->connect_push_consumer (cref.in (),
                                                consumer_qos.get_ConsumerQOS ()
                                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

int
ECM_Supplier::supplier_id (void) const
{
  return this->supplier_id_;
}

void
ECM_Supplier::push (const RtecEventComm::EventSet& events
                    ACE_ENV_ARG_DECL)
{
  for (u_int i = 0; i < events.length (); ++i)
    {
      const RtecEventComm::Event& e = events[i];
      if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
        continue;

      this->federation_->supplier_timeout (this->consumer_proxy_.in ()
                                           ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
ECM_Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  // this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
ECM_Supplier::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
}

// ****************************************************************

ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -