📄 supplier.cpp
字号:
// Supplier.cpp,v 1.17 2003/11/04 05:21:33 dhinton Exp
#include "Supplier.h"
#include "orbsvcs/Time_Utilities.h"
#include "orbsvcs/Event_Utilities.h"
#include "tao/debug.h"
#include "ace/OS_NS_unistd.h"
ACE_RCSID(EC_Tests, EC_Supplier, "Supplier.cpp,v 1.17 2003/11/04 05:21:33 dhinton Exp")
EC_Supplier::EC_Supplier (EC_Driver *driver,
void* cookie)
: driver_ (driver),
cookie_ (cookie),
push_count_ (0),
burst_count_ (0),
burst_size_ (0),
payload_size_ (0),
burst_pause_ (0),
shutdown_event_type_ (0),
is_active_ (0)
{
}
void
EC_Supplier::send_event (int event_number
ACE_ENV_ARG_DECL)
{
if (CORBA::is_nil (this->consumer_proxy_.in ()))
return;
// Create the event...
RtecEventComm::EventSet event (1);
event.length (1);
event[0].header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
// We use replace to minimize the copies, this should result
// in just one memory allocation:
event[0].data.payload.length (this->payload_size_);
this->event_type (event_number, event[0]);
this->send_event (event ACE_ENV_ARG_PARAMETER);
}
void
EC_Supplier::send_event (const RtecEventComm::EventSet& event
ACE_ENV_ARG_DECL)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
if (this->push_count_ == 0)
this->throughput_start_ = ACE_OS::gethrtime ();
this->push_count_ += event.length ();
if (TAO_debug_level > 0
&& this->push_count_ % 100 == 0)
{
ACE_DEBUG ((LM_DEBUG,
"EC_Consumer (%P|%t): %d events received\n",
this->push_count_));
}
ACE_hrtime_t start = ACE_OS::gethrtime ();
this->consumer_proxy_->push (event ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_hrtime_t end = ACE_OS::gethrtime ();
this->throughput_.sample (end - this->throughput_start_,
end - start);
}
void
EC_Supplier::event_type (int event_number,
RtecEventComm::Event &event)
{
CORBA::ULong l = this->qos_.publications.length ();
if (l == 0)
{
event.header.source = 0;
event.header.type = this->shutdown_event_type_;
}
else
{
int i = event_number % l;
int type = this->qos_.publications[i].event.header.type;
if (type == this->shutdown_event_type_)
i = 0;
RtecEventComm::EventHeader& header =
this->qos_.publications[i].event.header;
event.header.source = header.source;
event.header.type = header.type;
}
}
void
EC_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin,
const RtecEventChannelAdmin::SupplierQOS& qos,
int shutdown_event_type
ACE_ENV_ARG_DECL)
{
this->consumer_proxy_ =
supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->connect (qos, shutdown_event_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
EC_Supplier::connect (const RtecEventChannelAdmin::SupplierQOS& qos,
int shutdown_event_type
ACE_ENV_ARG_DECL)
{
if (CORBA::is_nil (this->consumer_proxy_.in ()))
return; // @@ Throw?
this->qos_ = qos;
this->shutdown_event_type_ = shutdown_event_type;
if (CORBA::is_nil (this->myself_.in ()))
{
this->myself_ = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
this->is_active_ = 1;
this->consumer_proxy_->connect_push_supplier (this->myself_.in (),
qos
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
EC_Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
if (CORBA::is_nil (this->consumer_proxy_.in ()))
return;
this->consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_proxy_ =
RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
}
void
EC_Supplier::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
if (!this->is_active_)
return;
// Deactivate the servant
PortableServer::POA_var poa =
this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
PortableServer::ObjectId_var id =
poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->is_active_ = 0;
this->myself_ = RtecEventComm::PushSupplier::_nil ();
}
void
EC_Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
this->driver_->supplier_disconnect (this->cookie_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->consumer_proxy_ =
RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
}
void
EC_Supplier::dump_results (const char* name,
ACE_UINT32 gsf)
{
this->throughput_.dump_results (name, gsf);
}
void
EC_Supplier::accumulate (ACE_Throughput_Stats& stats) const
{
stats.accumulate (this->throughput_);
}
// ****************************************************************
EC_Supplier_Task::EC_Supplier_Task (EC_Supplier* supplier,
EC_Driver* driver,
void* cookie,
int burst_count,
int burst_size,
int burst_pause,
int payload_size,
int shutdown_event_type,
ACE_Thread_Manager* thr_mgr)
: ACE_Task_Base (thr_mgr),
supplier_ (supplier),
driver_ (driver),
cookie_ (cookie),
burst_count_ (burst_count),
burst_size_ (burst_size),
burst_pause_ (burst_pause),
payload_size_ (payload_size),
shutdown_event_type_ (shutdown_event_type)
{
}
int
EC_Supplier_Task::svc (void)
{
ACE_DECLARE_NEW_CORBA_ENV;
// Initialize a time value to pace the test
ACE_Time_Value tv (0, this->burst_pause_);
RtecEventComm::EventSet event (1);
event.length (1);
event[0].header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
// We use replace to minimize the copies, this should result
// in just one memory allocation;
event[0].data.payload.length (this->payload_size_);
for (int i = 0; i < this->burst_count_; ++i)
{
for (int j = 0; j < this->burst_size_; ++j)
{
ACE_TRY
{
this->supplier_->event_type (j, event[0]);
ACE_hrtime_t now = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
now);
// ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
this->supplier_->send_event (event ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCH (CORBA::SystemException, sys_ex)
{
ACE_PRINT_EXCEPTION (sys_ex, "SYS_EX");
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "SYS_EX");
}
ACE_ENDTRY;
}
ACE_OS::sleep (tv);
}
ACE_TRY_EX(SHUTDOWN)
{
// Send one event shutdown from each supplier
event[0].header.type = this->shutdown_event_type_;
ACE_hrtime_t now = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
now);
this->supplier_->send_event (event ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK_EX (SHUTDOWN);
}
ACE_CATCH (CORBA::SystemException, sys_ex)
{
ACE_PRINT_EXCEPTION (sys_ex, "SYS_EX");
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "SYS_EX");
}
ACE_ENDTRY;
ACE_DEBUG ((LM_DEBUG,
"Supplier task finished\n"));
return 0;
}
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -