📄 ftec_event_channel_impl.cpp
字号:
// FTEC_Event_Channel_Impl.cpp,v 1.11 2004/01/08 20:05:24 jwillemsen Exp
#include "FTEC_Event_Channel_Impl.h"
#include "FTEC_Factory.h"
#include "FTEC_SupplierAdmin.h"
#include "FTEC_ConsumerAdmin.h"
#include "FTEC_ProxyConsumer.h"
#include "FTEC_ProxySupplier.h"
//#include "../Utils/ScopeGuard.h"
#include "FtEventServiceInterceptor.h"
#include "FT_ProxyAdmin_Base.h"
#include "IOGR_Maker.h"
#include "Replication_Service.h"
#include "orbsvcs/FtRtecEventCommC.h"
ACE_RCSID (EventChannel,
FTEC_Event_Channel_Impl,
"FTEC_Event_Channel_Impl.cpp,v 1.11 2004/01/08 20:05:24 jwillemsen Exp")
void obtain_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
ec->consumer_admin()->obtain_proxy(op ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void obtain_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
ec->supplier_admin()->obtain_proxy(op ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void obtain_push_consumer_and_connect(TAO_FTEC_Event_Channel_Impl* ec,
const FtRtecEventChannelAdmin::ObjectId& oid,
RtecEventComm::PushSupplier_ptr push_supplier,
const RtecEventChannelAdmin::SupplierQOS & qos
ACE_ENV_ARG_DECL)
{
Request_Context_Repository().set_object_id(oid ACE_ENV_ARG_PARAMETER);
RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
ec->supplier_admin()->obtain(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
ACE_TRY {
consumer->connect_push_supplier(push_supplier, qos
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHALL {
ec->supplier_admin()->disconnect(consumer.in());
ACE_RE_THROW;
}
ACE_ENDTRY;
ACE_CHECK;
}
void connect_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
PortableServer::POA_var poa= ec->supplier_poa();
ACE_CHECK;
FtRtecEventChannelAdmin::Connect_push_supplier_param& param
= op.param.connect_supplier_param();
TAO_FTEC_ProxyPushConsumer* proxy
= ec->find_proxy_push_consumer(op.object_id);
if (proxy == NULL) {
obtain_push_consumer_and_connect(ec,
op.object_id,
param.push_supplier.in(),
param.qos
ACE_ENV_ARG_PARAMETER);
}
else {
proxy->connect_push_supplier(param.push_supplier.in(),
param.qos
ACE_ENV_ARG_PARAMETER);
}
ACE_CHECK;
}
void obtain_push_supplier_and_connect(TAO_FTEC_Event_Channel_Impl* ec,
const FtRtecEventChannelAdmin::ObjectId& oid,
RtecEventComm::PushConsumer_ptr push_consumer,
const RtecEventChannelAdmin::ConsumerQOS & qos
ACE_ENV_ARG_DECL)
{
Request_Context_Repository().set_object_id(oid ACE_ENV_ARG_PARAMETER);
RtecEventChannelAdmin::ProxyPushSupplier_var supplier =
ec->consumer_admin()->obtain(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
ACE_TRY {
supplier->connect_push_consumer(push_consumer, qos
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHALL {
ec->consumer_admin()->disconnect(supplier.in());
ACE_RE_THROW;
}
ACE_ENDTRY;
ACE_CHECK;
}
void connect_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
PortableServer::POA_var poa= ec->consumer_poa();
ACE_CHECK;
FtRtecEventChannelAdmin::Connect_push_consumer_param& param
= op.param.connect_consumer_param();
TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
if (proxy == NULL){
obtain_push_supplier_and_connect(ec,
op.object_id,
param.push_consumer.in(),
param.qos
ACE_ENV_ARG_PARAMETER);
}
else {
proxy->connect_push_consumer(param.push_consumer.in(),
param.qos
ACE_ENV_ARG_PARAMETER);
}
ACE_CHECK;
}
void disconnect_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
PortableServer::POA_var poa= ec->consumer_poa();
ACE_CHECK;
TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
if (proxy == NULL) // proxy not found
ACE_THROW(FTRT::InvalidUpdate());
ACE_CHECK;
proxy->disconnect_push_supplier(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void disconnect_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
PortableServer::POA_var poa= ec->supplier_poa();
ACE_CHECK;
TAO_FTEC_ProxyPushConsumer* proxy = ec->find_proxy_push_consumer(op.object_id);
if (proxy == NULL) // proxy not found
ACE_THROW(FTRT::InvalidUpdate());
ACE_CHECK;
proxy->disconnect_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void suspend_connection (TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
PortableServer::POA_var poa= ec->consumer_poa();
ACE_CHECK;
TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
if (proxy == NULL) // proxy not found
ACE_THROW(FTRT::InvalidUpdate());
ACE_CHECK;
proxy->suspend_connection(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void resume_connection(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL)
{
TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
if (proxy == NULL) // proxy not found
ACE_THROW(FTRT::InvalidUpdate());
ACE_CHECK;
proxy->resume_connection(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
typedef void (*Set_update_fun)(TAO_FTEC_Event_Channel_Impl* ec,
FtRtecEventChannelAdmin::Operation& op
ACE_ENV_ARG_DECL);
Set_update_fun update_table[] = {
&obtain_push_supplier,
&obtain_push_consumer,
&disconnect_push_supplier,
&disconnect_push_consumer,
&suspend_connection,
&resume_connection,
&connect_push_supplier,
&connect_push_consumer
};
TAO_FTEC_Event_Channel_Impl::TAO_FTEC_Event_Channel_Impl(
const TAO_EC_Event_Channel_Attributes& attributes)
: TAO_EC_Event_Channel_Base(attributes, new TAO_FTEC_Basic_Factory, false)
{
this->scheduler_ =
CORBA::Object::_duplicate (attributes.scheduler);
this->create_strategies ();
}
TAO_FTEC_Event_Channel_Impl::~TAO_FTEC_Event_Channel_Impl()
{
}
TAO_FTEC_Basic_Factory*
TAO_FTEC_Event_Channel_Impl::factory()
{
return static_cast<TAO_FTEC_Basic_Factory*>(TAO_EC_Event_Channel_Base::factory());
}
/// Start the internal threads (if any), etc.
/// After this call the EC can be used.
void
TAO_FTEC_Event_Channel_Impl::activate_object (
CORBA::ORB_var orb,
const FtRtecEventComm::ObjectId& supplier_admin_oid,
const FtRtecEventComm::ObjectId& consumer_admin_oid
ACE_ENV_ARG_DECL)
{
iogr_maker_.init(orb.in() ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
TAO_EC_Event_Channel_Base::activate(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
supplier_admin()->activate(supplier_admin_oid ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
consumer_admin()->activate(consumer_admin_oid ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
// = The RtecEventChannelAdmin::EventChannel methods...
/// The default implementation is:
/// this->consumer_admin ()->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
RtecEventChannelAdmin::ConsumerAdmin_ptr
TAO_FTEC_Event_Channel_Impl::for_consumers (ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
CORBA::Object_var obj = consumer_admin()->reference(ACE_ENV_SINGLE_ARG_PARAMETER);
obj = IOGR_Maker::instance()->forge_iogr(obj.in()
ACE_ENV_ARG_PARAMETER);
return RtecEventChannelAdmin::ConsumerAdmin::_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
}
/// The default implementation is:
/// this->supplier_admin ()->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
RtecEventChannelAdmin::SupplierAdmin_ptr
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -