📄 ec_mcast.cpp
字号:
argv[0]));
return -1;
}
}
if (this->event_count_ < 0
|| this->event_count_ >= ECM_Driver::MAX_EVENTS)
{
ACE_DEBUG ((LM_DEBUG,
"%s: event count (%d) is out of range, "
"reset to default (%d)\n",
argv[0], this->event_count_,
100));
this->event_count_ = 100;
}
return 0;
}
int
ECM_Driver::parse_config_file (void)
{
FILE* cfg = 0;
if (this->config_filename_ != 0)
cfg = ACE_OS::fopen (this->config_filename_, "r");
else
cfg = stdin;
if (cfg == 0)
{
ACE_ERROR_RETURN ((LM_ERROR, "cannot open config file <%s>\n",
this->config_filename_), -1);
}
int s = fscanf (cfg, "%d", &this->all_federations_count_);
if (s == 0 || s == EOF)
{
ACE_ERROR_RETURN ((LM_ERROR,
"problem reading federation count\n"), -1);
}
// ACE_DEBUG ((LM_DEBUG,
// "total federations = %d\n",
// this->all_federations_count_));
for (int i = 0; i < this->all_federations_count_; ++i)
{
if (this->skip_blanks (cfg, "reading federation name"))
return -1;
ACE_Read_Buffer reader(cfg);
char* buf = reader.read (' ', ' ', '\0');
char* name = CORBA::string_dup (buf);
reader.alloc()->free (buf);
int port;
if (this->skip_blanks (cfg, "reading federation port number"))
return -1;
fscanf (cfg, "%d", &port);
CORBA::UShort mcast_port = ACE_static_cast(CORBA::UShort, port);
int ns, nc;
if (this->skip_blanks (cfg, "reading supplier count"))
return -1;
s = fscanf (cfg, "%d", &ns);
if (s == 0 || s == EOF)
{
ACE_ERROR_RETURN ((LM_ERROR,
"problem reading supplier count (%d)\n",
i), -1);
}
if (this->skip_blanks (cfg, "reading constumer count"))
return -1;
s = fscanf (cfg, "%d", &nc);
if (s == 0 || s == EOF)
{
ACE_ERROR_RETURN ((LM_ERROR,
"problem reading consumer count (%d)\n",
i), -1);
}
// ACE_DEBUG ((LM_DEBUG, "i = %d <%s> <%d> <%d> <%d>\n",
// i, name, mcast_port, ns, nc));
char** supplier_names;
char** consumer_names;
ACE_NEW_RETURN (supplier_names, char*[ns], -1);
ACE_NEW_RETURN (consumer_names, char*[nc], -1);
if (this->parse_name_list (cfg, ns, supplier_names,
"reading supplier list"))
{
ACE_ERROR_RETURN ((LM_ERROR,
"error parsing supplier list for <%s>\n",
name), -1);
}
if (this->parse_name_list (cfg, nc, consumer_names,
"reading consumer list"))
{
ACE_ERROR_RETURN ((LM_ERROR,
"error parsing consumer list for <%s>\n",
name), -1);
}
ACE_NEW_RETURN (this->all_federations_[i],
ECM_Federation (name, mcast_port,
ns, supplier_names,
nc, consumer_names), -1);
}
ACE_OS::fclose (cfg);
for (int j = 0; j < this->local_federations_count_; ++j)
{
int k = 0;
for (; k < this->all_federations_count_; ++k)
{
if (ACE_OS::strcmp (this->local_names_[j],
this->all_federations_[k]->name ()) == 0)
{
ACE_NEW_RETURN (this->local_federations_[j],
ECM_Local_Federation (this->all_federations_[k],
this),
-1);
break;
}
}
if (k == this->all_federations_count_)
ACE_ERROR ((LM_ERROR,
"Cannot find federations <%s>\n",
this->local_names_[j]));
}
return 0;
}
int
ECM_Driver::parse_name_list (FILE* file,
int n,
char** names,
const char* error_msg)
{
for (int i = 0; i < n; ++i)
{
if (this->skip_blanks (file, error_msg))
{
ACE_ERROR_RETURN ((LM_ERROR,
"error on item %d while %s\n",
i, error_msg), -1);
}
ACE_Read_Buffer tmp(file);
char* buf = tmp.read ('\n', '\n', '\0');
names[i] = CORBA::string_dup (buf);
tmp.alloc ()->free (buf);
}
return 0;
}
int
ECM_Driver::skip_blanks (FILE* file,
const char* error_msg)
{
int c;
// Consume all the blanks.
while (isspace (c = fgetc (file)));
if (c == EOF)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Unexpected EOF in config file while %s\n",
error_msg),
-1);
}
ungetc (c, file);
return 0;
}
// ****************************************************************
ECM_Federation::ECM_Federation (char* name,
CORBA::UShort mcast_port,
int supplier_types,
char** supplier_names,
int consumer_types,
char** consumer_names)
: name_ (name),
mcast_port_ (mcast_port),
supplier_types_ (supplier_types),
supplier_names_ (supplier_names),
consumer_types_ (consumer_types),
consumer_names_ (consumer_names),
addr_server_ (mcast_port)
{
sender_ = TAO_ECG_UDP_Sender::create (true);
ACE_NEW (this->supplier_ipaddr_, CORBA::ULong[this->supplier_types_]);
ACE_NEW (this->consumer_ipaddr_, CORBA::ULong[this->consumer_types_]);
int i;
for (i = 0; i < this->supplier_types_; ++i)
{
ACE_INET_Addr addr (u_short(0), this->supplier_names_[i]);
this->supplier_ipaddr_[i] = addr.get_ip_address ();
}
for (i = 0; i < this->consumer_types_; ++i)
{
ACE_INET_Addr addr (u_short(0), this->consumer_names_[i]);
this->consumer_ipaddr_[i] = addr.get_ip_address ();
}
}
ECM_Federation::~ECM_Federation (void)
{
delete[] this->consumer_ipaddr_;
delete[] this->supplier_ipaddr_;
}
void
ECM_Federation::open (TAO_ECG_UDP_Out_Endpoint *endpoint,
RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL)
{
RtecUDPAdmin::AddrServer_var addr_server =
this->addr_server (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->sender_->init (ec,
addr_server.in (),
endpoint
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// @@ TODO Make this a parameter....
this->sender_->mtu (64);
// The worst case execution time is far less than 2
// milliseconds, but that is a safe estimate....
ACE_Time_Value tv (0, 2000);
TimeBase::TimeT time;
ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
ACE_ConsumerQOS_Factory qos;
qos.start_disjunction_group ();
for (int i = 0; i < this->consumer_types (); ++i)
{
qos.insert_type (this->consumer_ipaddr (i), 0);
}
RtecEventChannelAdmin::ConsumerQOS qos_copy = qos.get_ConsumerQOS ();
this->sender_->connect (qos_copy ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
ECM_Federation::close (ACE_ENV_SINGLE_ARG_DECL)
{
this->sender_->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
RtecUDPAdmin::AddrServer_ptr
ECM_Federation::addr_server (ACE_ENV_SINGLE_ARG_DECL)
{
return this->addr_server_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
}
// ****************************************************************
ECM_Supplier::ECM_Supplier (ECM_Local_Federation* federation)
: federation_ (federation),
consumer_ (this)
{
}
void
ECM_Supplier::open (const char* name,
RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL)
{
this->supplier_id_ = ACE::crc32 (name);
ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
this->supplier_id_));
ACE_SupplierQOS_Factory qos;
for (int i = 0; i < this->federation_->supplier_types (); ++i)
{
qos.insert (this->supplier_id_,
this->federation_->supplier_ipaddr (i),
0, 1);
}
qos.insert (this->supplier_id_,
ACE_ES_EVENT_SHUTDOWN,
0, 1);
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_proxy_ =
supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
RtecEventComm::PushSupplier_var objref = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_proxy_->connect_push_supplier (objref.in (),
qos.get_SupplierQOS ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
ECM_Supplier::close (ACE_ENV_SINGLE_ARG_DECL)
{
if (CORBA::is_nil (this->consumer_proxy_.in ()))
return;
this->consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_proxy_ = 0;
}
void
ECM_Supplier::activate (RtecEventChannelAdmin::EventChannel_ptr ec,
RtecEventComm::Time interval
ACE_ENV_ARG_DECL)
{
ACE_ConsumerQOS_Factory consumer_qos;
consumer_qos.start_disjunction_group ();
consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
interval,
0);
// = Connect as a consumer.
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->supplier_proxy_ =
consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
RtecEventComm::PushConsumer_var cref =
this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->supplier_proxy_->connect_push_consumer (cref.in (),
consumer_qos.get_ConsumerQOS ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
int
ECM_Supplier::supplier_id (void) const
{
return this->supplier_id_;
}
void
ECM_Supplier::push (const RtecEventComm::EventSet& events
ACE_ENV_ARG_DECL)
{
for (u_int i = 0; i < events.length (); ++i)
{
const RtecEventComm::Event& e = events[i];
if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
continue;
this->federation_->supplier_timeout (this->consumer_proxy_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ECM_Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
}
void
ECM_Supplier::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
}
// ****************************************************************
ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -