📄 event_channel.cpp
字号:
-1);
consumer->_duplicate ();
return 0;
}
int
ACE_ES_Subscription_Info::insert_or_allocate (Subscriber_Map &type_map,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type)
{
Type_Subscribers *subscribers;
if (type_map.find (type, subscribers) == -1)
{
// If the correct type set does not exist, make one with a null
// dependency info (since there is no supplier of this event).
subscribers = new Type_Subscribers (0);
if (type_map.bind (type, subscribers) == -1)
{
ACE_ERROR ((LM_ERROR, "%p bind failed.\n",
"ACE_ES_Subscription_Info::insert_or_allocate"));
delete subscribers;
return -1;
}
}
if (subscribers->consumers_.insert (consumer) == -1)
{
ACE_ERROR ((LM_ERROR, "%p insert failed.\n",
"ACE_ES_Subscription_Info::insert_or_allocate"));
}
consumer->_duplicate ();
return 0;
}
int
ACE_ES_Subscription_Info::insert_or_fail (Subscriber_Map &type_map,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type,
RtecScheduler::Dependency_Info *&dependency)
{
Type_Subscribers *subscribers;
// Get the subscriber set for <type>.
if (type_map.find (type, subscribers) == -1)
return -1;
// Pass back the description of the method generating <type>.
dependency = subscribers->dependency_info_;
// Insert the new consumer into the subscriber set.
if (subscribers->consumers_.insert (consumer) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR, "%p insert failed.\n",
"ACE_ES_Subscription_Info::insert_or_fail"),
-1);
}
consumer->_duplicate ();
return 0;
}
ACE_ES_Consumer_Module::ACE_ES_Consumer_Module (ACE_EventChannel* channel)
: lock_ (),
all_consumers_ (),
channel_ (channel),
down_ (0)
{
}
void
ACE_ES_Consumer_Module::open (ACE_ES_Dispatching_Module *down)
{
down_ = down;
}
void
ACE_ES_Consumer_Module::connected (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL)
{
// ACE_DEBUG ((LM_DEBUG,
// "EC (%t) Consumer_Module - connecting consumer %x\n",
// consumer));
this->channel_->report_connect (ACE_EventChannel::CONSUMER);
this->down_->connected (consumer ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (!consumer->qos ().is_gateway)
this->channel_->update_consumer_gwys (ACE_ENV_SINGLE_ARG_PARAMETER);
}
void
ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request)
{
ACE_TRY_NEW_ENV
{
Shutdown_Consumer *sc = (Shutdown_Consumer *) request;
// Tell everyone else that the consumer is disconnected. This means
// that *nothing* is left in the system for the consumer, so
// everyone can free up any resources.
this->down_->disconnected (sc->consumer ());
// ACE_DEBUG ((LM_DEBUG,
// "EC (%t) Consumer_Module - remove consumer %x\n",
// sc->consumer ()));
CORBA::Boolean dont_update = sc->consumer ()->qos ().is_gateway;
// Deactivate the consumer proxy
PortableServer::POA_var poa =
sc->consumer ()->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
PortableServer::ObjectId_var id =
poa->servant_to_id (sc->consumer () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Delete the consumer proxy, no need to delete it, is is owned
// by the POA
// delete sc->consumer ();
if (!dont_update)
this->channel_->update_consumer_gwys (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_GUARD_THROW_EX (
ACE_ES_MUTEX, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_TRY_CHECK;
// Tell the channel that we may need to shut down.
if (all_consumers_.size () <= 0)
{
// ACE_DEBUG ((LM_DEBUG,
// "EC (%t) No more consumers connected.\n"));
channel_->report_disconnect_i (ACE_EventChannel::CONSUMER);
}
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Consumer_Module::shutdown_request");
}
ACE_ENDTRY;
}
void
ACE_ES_Consumer_Module::shutdown (void)
{
Consumers copy;
{
ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_);
if (ace_mon.locked () == 0)
goto DONE;
if (all_consumers_.size () == 0)
goto DONE;
// Make a copy so that the consumers can disconnect without the
// lock being held.
copy = all_consumers_;
}
// This scope is just to thwart the compiler. It was complaining
// about the above goto's bypassing variable initializations. Yadda
// yadda.
{
Consumer_Iterator iter (copy);
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
for (ACE_Push_Consumer_Proxy **proxy = 0;
iter.next (proxy) != 0;
iter.advance ())
{
(*proxy)->shutdown ();
// @@ Cannnot use CORBA::release (*proxy), since it is a
// servant.
// Deactivate the proxy...
PortableServer::POA_var poa =
(*proxy)->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
PortableServer::ObjectId_var id =
poa->servant_to_id (*proxy ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Remove the consumer from our list.
{
ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_);
if (ace_mon.locked () == 0)
ACE_ERROR ((LM_ERROR, "%p Failed to acquire lock.\n", "ACE_ES_Consumer_Module::shutdown"));
if (all_consumers_.remove (*proxy) == -1)
ACE_ERROR ((LM_ERROR, "%p Failed to remove consumer.\n", "ACE_ES_Consumer_Module::shutdown"));
}
// No need to delete it, owned by the POA
// delete *proxy;
}
}
ACE_CATCHANY
{
// Ignore the exceptions...
}
ACE_ENDTRY;
}
DONE:
channel_->shutdown ();
}
void
ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL)
{
{
ACE_GUARD_THROW_EX (
ACE_ES_MUTEX, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_CHECK;
if (all_consumers_.remove (consumer) == -1)
ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR());
}
// Tell everyone else that the consumer is disconnecting. This
// allows them to remove the consumer from any subscription lists
// etc. However, messages may still be queued in the ReactorEx or
// in the Dispatching Module for this consumer, so no queues or
// proxies can be deleted just yet.
down_->disconnecting (consumer ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// Send a shutdown message through the system. When this is
// dispatched, the consumer proxy will be deleted. <request> is
// queued in the Priority_Timer at <priority> level. It will be
// scheduled for dispatching in 1 nanosecond. This gives components
// a hook into the first queueing point in the channel.
// Create a shutdown message. When this is dispatched, it will
// delete the proxy.
RtecScheduler::Scheduler_var scheduler =
this->channel_->scheduler ();
Shutdown_Consumer *sc =
new Shutdown_Consumer (this, consumer, scheduler.in ());
if (sc == 0)
ACE_THROW (CORBA::NO_MEMORY ());
// Create a wrapper around the dispatch request.
Flush_Queue_ACT *act =
new Flush_Queue_ACT (sc, channel_->dispatching_module_);
if (act == 0)
ACE_THROW (CORBA::NO_MEMORY ());
// ACE_DEBUG ((LM_DEBUG, "EC (%t) initiating consumer disconnect.\n"));
// Set a 100ns timer.
TimeBase::TimeT ns100;
ORBSVCS_Time::hrtime_to_TimeT (ns100, 100);
if (this->channel_->schedule_timer (0, // no rt_info
act,
ACE_Scheduler_MIN_PREEMPTION_PRIORITY,
ns100,
ORBSVCS_Time::zero ()) == -1)
{
ACE_ERROR ((LM_ERROR, "%p queue_request failed.\n", "ACE_ES_Consumer_Module"));
delete sc;
delete act;
}
}
// This method executes in the same thread of control that will hand
// the event set to the consumer (or it's proxy). A network proxy may
// copy the event set to the network buffer. An active client may
// copy the event set to be queued. Or a same address-space consumer
// can read the set we allocated off the stack.
void
ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request
ACE_ENV_ARG_DECL)
{
// ACE_DEBUG ((LM_DEBUG, "EC (%t) Consumer_Module::push\n"));
ACE_FUNCTION_TIMEPROBE (TAO_EVENT_CHANNEL_ENTER_ES_CONSUMER_MODULE_PUSH);
// We'll create a temporary event set with the size of the incoming
// request.
RtecEventComm::EventSet event_set;
request->make_copy (event_set);
// Forward the event set.
#if !defined(TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS)
ACE_hrtime_t ec_send = ACE_OS::gethrtime ();
for (CORBA::ULong i = 0; i < event_set.length (); ++i)
{
RtecEventComm::Event& ev = event_set[i];
ORBSVCS_Time::hrtime_to_TimeT (ev.header.ec_send_time, ec_send);
}
#endif /* TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS */
request->consumer ()->push (event_set ACE_ENV_ARG_PARAMETER);
}
RtecEventChannelAdmin::ProxyPushSupplier_ptr
ACE_ES_Consumer_Module::obtain_push_supplier (
ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
RtecEventChannelAdmin::ProxyPushSupplier_ptr proxy =
RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
auto_ptr<ACE_Push_Consumer_Proxy> new_consumer (new ACE_Push_Consumer_Proxy (this));
// Get a new supplier proxy object.
if (new_consumer.get () == 0)
{
ACE_ERROR ((LM_ERROR, "ACE_EventChannel"
"::obtain_push_supplier failed.\n"));
ACE_THROW_RETURN (CORBA::NO_MEMORY (), proxy);
}
{
ACE_GUARD_THROW_EX (
ACE_ES_MUTEX, ace_mon, this->lock_,
CORBA::INTERNAL ());
// @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_CHECK_RETURN (proxy);
if (all_consumers_.insert (new_consumer.get ()) == -1)
ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Module insert failed.\n"));
}
proxy = new_consumer->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (proxy);
// Give away ownership to the POA....
new_consumer.release ()->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (proxy);
return proxy;
}
void
ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos)
{
ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);
c_qos.is_gateway = 1;
int count = 0;
{
for (Consumer_Iterator i = this->all_consumers_.begin ();
i != this->all_consumers_.end ();
++i)
{
ACE_Push_Consumer_Proxy *c = *i;
if (c->qos ().is_gateway)
continue;
count += c->qos ().dependencies.length ();
}
}
RtecEventChannelAdmin::DependencySet& dep = c_qos.dependencies;
dep.length (count + 1);
CORBA::ULong cc = 0;
dep[cc].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
dep[cc].event.header.source = 0;
dep[cc].event.header.creation_time = ORBSVCS_Time::zero ();
dep[cc].rt_info = 0;
cc++;
for (Consumer_Iterator i = this->all_consumers_.begin ();
i != this->all_consumers_.end ();
++i)
{
ACE_Push_Consumer_Proxy *c = *i;
// ACE_DEBUG ((LM_DEBUG, "EC (%t) fill_qos "));
// ACE_ConsumerQOS_Factory::debug (c->qos ());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -