ft_notifier_i.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 775 行 · 第 1/2 页
CPP
775 行
// find the channel administrator for consumers
this->consumer_admin_ = this->notify_channel_->default_consumer_admin (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN(-1);
if (CORBA::is_nil (this->consumer_admin_.in ()))
{
ACE_ERROR ((LM_ERROR,
"%T %n (%P|%t) NIL consumer admin\n"
));
result = -1;
}
// everything else happens when subscriber shows up
///////////////////////////////
// Register with ReplicationManager
if (this->rm_register_)
{
ACE_TRY_NEW_ENV
{
CORBA::Object_var rm_obj = orb->resolve_initial_references("ReplicationManager" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->replication_manager_ = ::FT::ReplicationManager::_narrow(rm_obj.in() ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (!CORBA::is_nil (replication_manager_.in ()))
{
// @@: should we check to see if there's already one registered?
FT::FaultNotifier_var notifier = FT::FaultNotifier::_narrow (this_obj.in ());
if (! CORBA::is_nil (notifier.in ()))
{
this->replication_manager_->register_fault_notifier(notifier.in ());
ACE_DEBUG ((LM_DEBUG,
"FaultNotifier registered with ReplicationManager.\n"
));
this->registered_ = 1;
}
else
{
ACE_ERROR ((LM_ERROR,
"Error: Registration failed. This is not a FaultNotifier (should not occur.)\n"
));
}
}
else
{
ACE_ERROR ((LM_ERROR,"FaultNotifier: Can't resolve ReplicationManager, It will not be registered.\n" ));
}
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"FaultNotifier: Exception resolving ReplicationManager. Notifier will not be registered.\n" );
}
ACE_ENDTRY;
}
else
{
ACE_DEBUG ((LM_DEBUG,
"FaultNotifier: ReplicationManager registration disabled.\n"
));
}
///////////////////////////////
// Set up and ready for action
// publish our IOR
if(result == 0)
{
if (this->ior_output_file_ != 0)
{
this->identity_ = "file:";
this->identity_ += this->ior_output_file_;
result = write_ior();
}
}
if (result == 0)
{
if (this->ns_name_ != 0)
{
this->identity_ = "name:";
this->identity_ += this->ns_name_;
CORBA::Object_var naming_obj =
this->orb_->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (CORBA::is_nil(naming_obj.in ())){
ACE_ERROR_RETURN ((LM_ERROR,
"%T %n (%P|%t) Unable to find the Naming Service\n"),
1);
}
this->naming_context_ =
CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (CORBA::is_nil(this->naming_context_.in ()))
{
ACE_ERROR_RETURN ((LM_ERROR,
"%T %n (%P|%t) Should not occur: Can't narrow initial reference to naming context.\n"),
1);
}
this->this_name_.length (1);
this->this_name_[0].id = CORBA::string_dup (this->ns_name_);
this->naming_context_->rebind (this->this_name_, this_obj.in() //CORBA::Object::_duplicate(this_obj)
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
}
return result;
}
///////////////////
// CORBA METHODS
void TAO::FT_FaultNotifier_i::push_structured_fault (
const CosNotification::StructuredEvent & event
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((CORBA::SystemException))
{
METHOD_ENTRY(TAO::FT_FaultNotifier_i::push_structured_fault);
this->structured_proxy_push_consumer_->push_structured_event (event
ACE_ENV_ARG_PARAMETER);
METHOD_RETURN(TAO::FT_FaultNotifier_i::push_structured_fault);
}
void TAO::FT_FaultNotifier_i::push_sequence_fault (
const CosNotification::EventBatch & events
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((CORBA::SystemException))
{
METHOD_ENTRY(TAO::FT_FaultNotifier_i::push_sequence_fault);
this->sequence_proxy_push_consumer_->push_structured_events (events
ACE_ENV_ARG_PARAMETER);
METHOD_RETURN(TAO::FT_FaultNotifier_i::push_sequence_fault);
}
::CosNotifyFilter::Filter_ptr TAO::FT_FaultNotifier_i::create_subscription_filter (
const char * constraint_grammar
ACE_ENV_ARG_DECL_NOT_USED
)
ACE_THROW_SPEC ((CORBA::SystemException, CosNotifyFilter::InvalidGrammar))
{
METHOD_ENTRY(TAO::FT_FaultNotifier_i::create_subscription_filter);
ACE_UNUSED_ARG (constraint_grammar); //@@todo
CosNotifyFilter::Filter_var filter = this->filter_factory_->create_filter ("ETCL");
METHOD_RETURN(TAO::FT_FaultNotifier_i::create_subscription_filter)
filter._retn ();
}
FT::FaultNotifier::ConsumerId TAO::FT_FaultNotifier_i::connect_structured_fault_consumer (
CosNotifyComm::StructuredPushConsumer_ptr push_consumer,
CosNotifyFilter::Filter_ptr filter
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((CORBA::SystemException))
{
METHOD_ENTRY(TAO::FT_FaultNotifier_i::connect_structured_fault_consumer);
/////////////////////////
// find a ProxyInfo entry
// use the first nil entry or a new entry if no nils found
size_t infoPos = 0;
int looking = 1;
for ( size_t pos = 0; looking && pos < this->proxy_infos_.size (); ++pos)
{
ProxyInfo & pi = this->proxy_infos_[pos];
if (CORBA::is_nil(pi.proxyVar_.in ()))
{
infoPos = pos;
looking = 0;
}
}
if (looking)
{
infoPos = this->proxy_infos_.size();
this->proxy_infos_.push_back(ProxyInfo());
}
///////////////////////////////////////
// Assign an ID, populate the ProxyInfo
FT::FaultNotifier::ConsumerId result = infoPos;
ProxyInfo & info = this->proxy_infos_[infoPos];
info.proxyVar_
= this->consumer_admin_->obtain_notification_push_supplier (
::CosNotifyChannelAdmin::STRUCTURED_EVENT,
info.proxyId_
ACE_ENV_ARG_PARAMETER);
this->consumer_connects_ += 1;
::CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier
= ::CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(info.proxyVar_.in ()
ACE_ENV_ARG_PARAMETER);
if ( CORBA::is_nil (proxySupplier.in ()))
{
// this is a shoould-not-occur situation. The consumer admin returned
// the wrong kind of object.
ACE_ERROR(( LM_ERROR,
"%T %n (%P|%t) Unexpected result: Wrong type of object returned from obtain_notification_push_supplier\n"
));
}
else
{
proxySupplier->connect_structured_push_consumer ( push_consumer
ACE_ENV_ARG_PARAMETER);
if (! CORBA::is_nil (filter))
{
proxySupplier->add_filter(filter);
}
}
METHOD_RETURN(TAO::FT_FaultNotifier_i::connect_structured_fault_consumer) result;
}
FT::FaultNotifier::ConsumerId TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer (
CosNotifyComm::SequencePushConsumer_ptr push_consumer,
CosNotifyFilter::Filter_ptr filter
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((CORBA::SystemException))
{
METHOD_ENTRY(TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer);
/////////////////////////
// find a ProxyInfo entry
// use the first nil entry or a new entry if no nils found
size_t infoPos = 0;
int looking = 1;
for ( size_t pos = 0; looking && pos < this->proxy_infos_.size (); ++pos)
{
ProxyInfo & pi = this->proxy_infos_[pos];
if (CORBA::is_nil(pi.proxyVar_.in ()))
{
infoPos = pos;
looking = 0;
}
}
if (looking)
{
infoPos = this->proxy_infos_.size();
this->proxy_infos_.push_back(ProxyInfo());
}
///////////////////////////////////////
// Assign an ID, populate the ProxyInfo
FT::FaultNotifier::ConsumerId result = infoPos;
ProxyInfo & info = this->proxy_infos_[infoPos];
info.proxyVar_
= this->consumer_admin_->obtain_notification_push_supplier (
::CosNotifyChannelAdmin::SEQUENCE_EVENT,
info.proxyId_
ACE_ENV_ARG_PARAMETER);
this->consumer_connects_ += 1;
::CosNotifyChannelAdmin::SequenceProxyPushSupplier_var proxySupplier
= ::CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow(info.proxyVar_.in ()
ACE_ENV_ARG_PARAMETER);
if ( CORBA::is_nil (proxySupplier.in ()))
{
// this is a shoould-not-occur situation. The consumer admin returned
// the wrong kind of object.
ACE_ERROR(( LM_ERROR,
"%T %n (%P|%t) Unexpected result: Wrong type of object returned from obtain_notification_push_supplier\n"
));
}
else
{
proxySupplier->connect_sequence_push_consumer ( push_consumer
ACE_ENV_ARG_PARAMETER);
if (! CORBA::is_nil (filter))
{
proxySupplier->add_filter(filter);
}
}
METHOD_RETURN(TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer) result;
}
void TAO::FT_FaultNotifier_i::disconnect_consumer (
FT::FaultNotifier::ConsumerId connection
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((CORBA::SystemException, CosEventComm::Disconnected))
{
METHOD_ENTRY(TAO::FT_FaultNotifier_i::disconnect_consumer);
size_t index = ACE_static_cast ( size_t, connection);
if (index < this->proxy_infos_.size())
{
ProxyInfo & info = this->proxy_infos_[index];
if (CORBA::is_nil(info.proxyVar_.in ()) )
{
ACE_THROW(CosEventComm::Disconnected());
}
else
{
::CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier
= ::CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(info.proxyVar_.in ()
ACE_ENV_ARG_PARAMETER);
if (! CORBA::is_nil (proxySupplier.in ()))
{
proxySupplier->disconnect_structured_push_supplier ();
info.proxyVar_ = ::CosNotifyChannelAdmin::ProxySupplier::_nil();
}
else
{
::CosNotifyChannelAdmin::SequenceProxyPushSupplier_var proxySupplier
= ::CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow(info.proxyVar_.in ()
ACE_ENV_ARG_PARAMETER);
if (! CORBA::is_nil (proxySupplier.in ()))
{
proxySupplier->disconnect_sequence_push_supplier ();
info.proxyVar_ = ::CosNotifyChannelAdmin::ProxySupplier::_nil();
}
else
{
ACE_ERROR((LM_ERROR,
"%T %n (%P|%t) Unexpected proxy supplier type\n"
));
ACE_THROW(CosEventComm::Disconnected());
}
}
}
}
else
{
ACE_THROW(CosEventComm::Disconnected());
}
this->consumer_disconnects_ += 1;
if (this->quit_on_idle_)
{
if (! this->quitting_
&& this->consumer_connects_ == this->consumer_disconnects_)
{
ACE_ERROR((LM_ERROR,
"FaultNotifier (%P|%t) quit on idle: connects %d, disconnects %d\n",
ACE_static_cast (unsigned int, this->consumer_connects_),
ACE_static_cast (unsigned int, this->consumer_disconnects_)
));
this->poa_->deactivate_object (this->object_id_.in ()
ACE_ENV_ARG_PARAMETER);
this->quitting_ = 1;
}
}
METHOD_RETURN(TAO::FT_FaultNotifier_i::disconnect_consumer);
}
CORBA::Boolean TAO::FT_FaultNotifier_i::is_alive (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
METHOD_RETURN(TAO::FT_FaultNotifier_i::is_alive) 1;
}
//////////////
// ProxyInfo
TAO::FT_FaultNotifier_i::ProxyInfo::ProxyInfo ()
: proxyId_ (0)
, proxyVar_ (::CosNotifyChannelAdmin::ProxySupplier::_nil())
{
}
TAO::FT_FaultNotifier_i::ProxyInfo::ProxyInfo (const ProxyInfo & rhs)
: proxyId_ (rhs.proxyId_)
, proxyVar_ (rhs.proxyVar_)
{
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Guard<ACE_SYNCH_MUTEX>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
# pragma instantiate ACE_Guard<ACE_SYNCH_MUTEX>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?