📄 dispatching_modules.cpp
字号:
}
// Enqueue the request. If we're multi-threaded, this request is a
// command object that will be called by the threads in the queue,
// or will be dequeued by this->handle_signal if we're
// single-threaded.
if (queues_[preemption_priority]->try_put (request) == -1)
{
if (ACE_ES_Dispatch_Request::release (request) != 0)
ACE_ERROR ((LM_ERROR, "ACE_ES_Priority_Dispatching::push"
" release failed.\n"));
if (errno != EPIPE)
{
ACE_THROW (CORBA::NO_MEMORY ());
}
else
{
ACE_DEBUG ((LM_DEBUG,
"EC (%t) Request rejected from closed queue %d.\n",
preemption_priority));
}
}
}
// Start at highest priority queue checking for queued events
// continuing to lowest priority queue. If an event is ever found,
// dispatch it and then start back at the highest priority queue
// again.
int
ACE_ES_Priority_Dispatching::handle_signal (int, siginfo_t *, ucontext_t *)
{
int done;
do
{
done = 1;
for (int x = 0; x <= highest_priority_; x++)
{
// If the queue is not empty, dispatch the request and then
// start the for loop from the beginning.
if ((queues_[x] != 0) && (!queues_[x]->msg_queue ()->is_empty ()))
{
// Dequeue and service the request.
queues_[x]->svc_one ();
// Exit the for loop and start over.
done = 0;
break;
}
// If we get through the whole for loop without dispatching
// anything, then we're done.
}
}
while (!done);
return 0;
}
// This is only for the non-win32 single-threaded implementation.
int
ACE_ES_Priority_Dispatching::handle_input (ACE_HANDLE)
{
return this->handle_signal (0, 0, 0);
}
void
ACE_ES_Priority_Dispatching::activate (int threads_per_queue)
{
this->threads_per_queue_ = threads_per_queue;
this->initialize_queues ();
}
// Shutdown each queue. When each queue exits, they will call back
// this->dispatch_queue_closed which allows us to free up resources.
// When the last queue has closed, we'll delete ourselves.
void
ACE_ES_Priority_Dispatching::shutdown (void)
{
if (shutdown_)
return;
ACE_DEBUG ((LM_DEBUG, "EC (%t) ACE_ES_Priority_Dispatching "
"module shutting down.\n"));
shutdown_ = 1;
// If we're single threaded, then we need to shut down the
// notification strategy so it can remove itself from the reactor.
if (threads_per_queue_ == 0)
notification_strategy_.shutdown ();
// Whether these are active or not, they must be shut down.
for (int x = 0; x <= highest_priority_; x++)
if (queues_[x] != 0)
{
ACE_DEBUG ((LM_DEBUG,
"EC (%t) shutting down dispatch queue %d.\n", x));
queues_[x]->shutdown_task ();
}
if (this->thr_mgr_.wait () == -1)
ACE_ERROR ((LM_ERROR, "%p\n",
"Priority_Dispatching::shutdown - waiting"));
for (int i = 0; i <= this->highest_priority_; ++i)
{
if (this->queues_[i] != 0)
{
delete this->queues_[i];
this->queues_[i] = 0;
}
}
}
// This gets called every time a Dispatch Queue closes down. We
// search for <queue> and delete it. If we have been told to delete
// ourself, after the last queue has been deleted, we delete
// ourselves.
void
ACE_ES_Priority_Dispatching::dispatch_queue_closed (ACE_ES_Dispatch_Queue *queue)
{
ACE_UNUSED_ARG (queue);
}
/*
ACE_HANDLE
ACE_ES_Priority_Dispatching::get_handle (void) const
{
ACE_ES_Priority_Dispatching *fake_this = (ACE_ES_Priority_Dispatching *) this;
return fake_this->notification_strategy_.get_handle ();
}
*/
// ************************************************************
ACE_ES_Dispatch_Queue::
ACE_ES_Dispatch_Queue (ACE_ES_Dispatching_Base *dispatching_module,
ACE_ES_Notification_Strategy *notification_strategy,
RtecScheduler::Scheduler_ptr scheduler)
: ACE_RT_Task (scheduler),
dispatching_module_ (dispatching_module),
notification_strategy_ (notification_strategy)
{
}
int
ACE_ES_Dispatch_Queue::open_queue (RtecScheduler::Period_t &period,
int threads)
{
// First set up the correct message queue according to whether the
// dispatch queue will be active or not.
// If there are no threads specified, we'll register with the
// reactor to be called back when someone queues a message.
if (threads == 0)
{
// Allocate a message queue that notifies a reactor when events
// arrive via the msg_queue call. If that succeeds, set the
// notification strategy in our message queue via the open call.
if (this->msg_queue () == 0 ||
this->msg_queue ()->open (ACE_ES_QUEUE::DEFAULT_HWM,
ACE_ES_QUEUE::DEFAULT_LWM,
notification_strategy_) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p msg_queue.open failed.\n",
"ACE_ES_Dispatch_Queue::open_queue"), -1);
}
else
{
// quick hack to test dynamic queue performance (to be replaced soon)
ACE_ES_QUEUE *mq = 0;
#if defined (TAO_USES_STRATEGY_SCHEDULER)
#if defined (TAO_USES_EDF_SCHEDULING)
ACE_Deadline_Message_Strategy *adms = new ACE_Deadline_Message_Strategy;
if (adms)
{
mq = new ACE_Dynamic_Message_Queue<ACE_SYNCH> (*adms);
}
#elif defined (TAO_USES_MLF_SCHEDULING) || defined (TAO_USES_MUF_SCHEDULING)
ACE_Laxity_Message_Strategy *alms = new ACE_Laxity_Message_Strategy;
if (alms)
{
mq = new ACE_Dynamic_Message_Queue<ACE_SYNCH> (*alms);
}
#else
mq = new ACE_ES_QUEUE;
#endif
#else
// Allocate a message queue that does not notify.
mq = new ACE_ES_MQ;
#endif
if (mq == 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Dispatch_Queue::open_queue"), -1);
else
{
// This deletes previous message queue.
this->msg_queue (mq);
// Set this so that the destructor of ACE_Task deletes our
// message queue. Note, this must be after the call to
// msg_queue.
delete_msg_queue_ = 1;
}
}
// Create a name for ourselves using the period. The period is
// in 100 ns units; first convert to usec by dividing by 10.
char temp[64];
ACE_OS::sprintf (temp, "ACE_ES_Dispatch_Queue-%u.us", period / 10);
// Open the task. This will query the scheduler for our qos
// structure. It will also synch_threads if it succeeds.
int result = this->open_task (temp);
switch (result)
{
case -1:
// Error.
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Dispatch_Queue::open_queue"), -1);
case 0:
{
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{// @@ TODO: Handle exceptions...
#if 1
this->scheduler_->set
(rt_info_,
RtecScheduler::VERY_HIGH_CRITICALITY,
ORBSVCS_Time::zero (),
ORBSVCS_Time::zero (),
ORBSVCS_Time::zero (),
period,
RtecScheduler::VERY_LOW_IMPORTANCE,
ORBSVCS_Time::zero (),
1,
RtecScheduler::OPERATION
ACE_ENV_ARG_PARAMETER);
#else
ACE_Scheduler_Factory::server()->set (rt_info_,
RtecScheduler::VERY_HIGH_CRITICALITY,
ORBSVCS_Time::zero (),
ORBSVCS_Time::zero (),
ORBSVCS_Time::zero (),
period,
RtecScheduler::VERY_LOW_IMPORTANCE,
ORBSVCS_Time::zero (),
1,
RtecScheduler::OPERATION
ACE_ENV_ARG_PARAMETER);
#endif
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"ACE_ES_Dispatch_Queue::exception");
return -1;
}
ACE_ENDTRY;
}
// FALLTHROUGH
case 1:
// Found.
break;
}
// Spawn threads.
return this->synch_threads (threads);
}
// This is called back by ACE_RT_Task when all the threads have
// exited. We're going to forward this event to the dispatching
// module so it can clean up any resources.
void
ACE_ES_Dispatch_Queue::threads_closed (void)
{
dispatching_module_->dispatch_queue_closed (this);
}
// ************************************************************
ACE_ES_EFD_Dispatching::ACE_ES_EFD_Dispatching (ACE_EventChannel *channel)
: ACE_ES_Dispatching_Base (channel)
{
}
void
ACE_ES_EFD_Dispatching::push (ACE_ES_Dispatch_Request *request
ACE_ENV_ARG_DECL_NOT_USED)
{
// If it's a request to forward an event, it needs a reference to us
// to call dispatch_event.
request->set (this, 0, ACE_Scheduler_MIN_SUB_PRIORITY);
u_long command_action = ACE_RT_Task_Command::RELEASE;
// This may be a request to delete a push consumer proxy, so we
// should execute it instead of just forwarding it.
request->execute (command_action);
switch (command_action)
{
case ACE_RT_Task_Command::RELEASE:
// Free the request.
if (ACE_ES_Dispatch_Request::release (request) != 0)
ACE_ERROR ((LM_ERROR, "ACE_ES_EFD_Dispatching::push"
" release failed.\n"));
break;
default:
ACE_ERROR ((LM_ERROR, "ACE_ES_EFD_Dispatching::push: unknown command action.\n"));
}
}
// ************************************************************
ACE_ES_RTU_Dispatching::ACE_ES_RTU_Dispatching (ACE_EventChannel *channel)
: ACE_ES_Priority_Dispatching (channel)
{
}
// We're called from a dispatch queue, so we can not release the request.
int
ACE_ES_RTU_Dispatching::dispatch_event (ACE_ES_Dispatch_Request *request,
u_long &command_action)
{
// Store the priority of the task currently running.
channel_->rtu_manager ()->priority (request->priority ());
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
// Forward the request.
up_->push (request ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
// No exceptions should be raised (push is a oneway) but we try
// to print something useful anyway.
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"ACE_ES_Dispatching_Base::dispatch_event");
}
ACE_ENDTRY;
// Reset the priority.
channel_->rtu_manager ()->priority (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, ACE_SCOPE_PROCESS));
// If the task was preempted, enqueue the request on the head of the
// dispatch queue.
if (channel_->rtu_manager ()->not_done ())
command_action = ACE_RT_Task_Command::UNGETQ;
else
// We're done with it.
command_action = ACE_RT_Task_Command::RELEASE;
return 0;
}
void
ACE_ES_RTU_Dispatching::push (ACE_ES_Dispatch_Request *request
ACE_ENV_ARG_DECL)
{
// First enqueue the message in the proper queue.
ACE_ES_Priority_Dispatching::push (request ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// If the current event is higher priority (lower value) than the
// current running task, then tell the task to preempt itself.
int request_priority = request->priority ();
int running_priority = channel_->rtu_manager ()->priority ();
if (request_priority < running_priority)
channel_->rtu_manager ()->should_preempt (1);
return;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -