📄 buffering_strategy.cpp
字号:
// Buffering_Strategy.cpp,v 1.9 2003/11/04 05:21:32 dhinton Exp
#include "Buffering_Strategy.h"
#if ! defined (__ACE_INLINE__)
#include "Buffering_Strategy.inl"
#endif /* __ACE_INLINE__ */
ACE_RCSID (Notify,
Buffering_Strategy,
"Buffering_Strategy.cpp,v 1.9 2003/11/04 05:21:32 dhinton Exp")
#include "ace/Message_Queue.h"
#include "orbsvcs/CosNotificationC.h"
#include "Method_Request.h"
#include "Notify_Extensions.h"
#include "QoSProperties.h"
#include "tao/debug.h"
#include "ace/Null_Condition.h"
TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
TAO_Notify_Message_Queue& msg_queue,
TAO_Notify_AdminProperties_var& admin_properties,
CORBA::Long batch_size
)
: msg_queue_ (msg_queue),
admin_properties_ (admin_properties),
global_queue_lock_ (admin_properties->global_queue_lock ()),
global_queue_not_full_condition_ (admin_properties->global_queue_not_full_condition ()),
global_queue_length_ (admin_properties->global_queue_length ()),
max_global_queue_length_ (admin_properties->max_global_queue_length ()),
max_local_queue_length_ (0),
order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder),
discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder),
use_discarding_ (1),
local_queue_not_full_condition_ (global_queue_lock_),
batch_size_ (batch_size),
batch_size_reached_condition_ (global_queue_lock_),
shutdown_ (0)
{
}
TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
{
}
void
TAO_Notify_Buffering_Strategy::update_qos_properties (
const TAO_Notify_QoSProperties& qos_properties
)
{
this->order_policy_.set (qos_properties);
if (this->discard_policy_.set (qos_properties) != -1)
{
this->use_discarding_ = 1;
}
TAO_Notify_Property_Time blocking_timeout (TAO_Notify_Extensions::BlockingPolicy);
// if set to a valid time, init the blocking_time_
if (blocking_timeout.set (qos_properties) != -1)
{
this->use_discarding_ = 0;
this->blocking_time_ =
# if defined (ACE_CONFIG_WIN32_H)
ACE_Time_Value (ACE_static_cast (long, blocking_timeout.value ()));
# else
ACE_Time_Value (blocking_timeout.value () / 1);
# endif /* ACE_CONFIG_WIN32_H */
}
}
void
TAO_Notify_Buffering_Strategy::shutdown (void)
{
ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
this->shutdown_ = 1;
this->global_queue_not_full_condition_.broadcast ();
this->local_queue_not_full_condition_.broadcast ();
this->batch_size_reached_condition_.broadcast ();
}
int
TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request& method_request)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
// while either local or global max reached
while ((this->max_local_queue_length_ != 0 &&
this->msg_queue_.message_count () == this->max_local_queue_length_)
||
(this->max_global_queue_length_.value () != 0 &&
this->global_queue_length_ == this->max_global_queue_length_.value ()))
{
if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
return -1;
if (this->use_discarding_ == 1)
{
if (this->global_queue_length_ == this->max_global_queue_length_.value ()
&& this->msg_queue_.message_count () == 0) // global max. reached but can't discard
{
// block. this is a hack because the real solution is
// to locate the appropriate queue and dequeue from it.
this->global_queue_not_full_condition_.wait ();
}
else // local max reached or, at global max but non-zero local count.
{
if (this->discard () == -1)
return -1;
--this->global_queue_length_;
// ACE_DEBUG ((LM_DEBUG, "Discarded from %x, global_queue_length = %d\n", this, this->global_queue_length_));
this->global_queue_not_full_condition_.signal ();
this->local_queue_not_full_condition_.signal ();
}
}
else // block
{
if (this->msg_queue_.message_count () == this->max_local_queue_length_) // local maximum reached
{
if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be.
{
this->local_queue_not_full_condition_.wait ();
}
else // finite blocking time.
{
ACE_Time_Value absolute = ACE_OS::gettimeofday () + this->blocking_time_;
if (this->local_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout
return -1; // Note message is discarded if it could not be enqueued in the given time.
}
}
else // global max reached
{
if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be.
{
this->global_queue_not_full_condition_.wait ();
}
else // finite blocking time.
{
ACE_Time_Value absolute = ACE_OS::gettimeofday () + blocking_time_;
if (this->global_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout
return -1;
}
}
} // block
} // while
if (this->queue (method_request) == -1)
{
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"Panic! failed to enqueue event"));
return -1;
}
++this->global_queue_length_;
// ACE_DEBUG ((LM_DEBUG, "Inserted to %x, global_queue_length = %d\n", this, this->global_queue_length_));
if (this->msg_queue_.message_count () == this->batch_size_)
batch_size_reached_condition_.signal ();
return this->msg_queue_.message_count ();
}
int
TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request* &method_request, const ACE_Time_Value *abstime)
{
ACE_Message_Block *mb;
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
while (this->msg_queue_.message_count () < this->batch_size_) // block
{
this->batch_size_reached_condition_.wait (abstime);
if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
return -1;
if (errno == ETIME)
return 0;
}
if (this->msg_queue_.dequeue (mb) == -1)
return -1;
method_request = ACE_dynamic_cast (TAO_Notify_Method_Request*, mb);
if (method_request == 0)
return -1;
--this->global_queue_length_;
// ACE_DEBUG ((LM_DEBUG, "Dequeued from %x, global_queue_length = %d\n", this, this->global_queue_length_));
this->global_queue_not_full_condition_.signal ();
this->local_queue_not_full_condition_.signal ();
return 1;
}
int
TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request& method_request)
{
int result;
// Queue according to order policy
if (this->order_policy_ == CosNotification::AnyOrder ||
this->order_policy_ == CosNotification::FifoOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"enqueue in fifo order\n"));
// Insert at the end of the queue.
result = this->msg_queue_.enqueue_tail (&method_request);
}
else if (this->order_policy_ == CosNotification::PriorityOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"enqueue in priority order\n"));
result = this->msg_queue_.enqueue_prio (&method_request);
}
else if (this->order_policy_ == CosNotification::DeadlineOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"enqueue in deadline order\n"));
result = this->msg_queue_.enqueue_deadline (&method_request);
}
else
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
result = -1;
}
return result;
}
int
TAO_Notify_Buffering_Strategy::discard (void)
{
ACE_Message_Block *mb;
int result;
if (this->discard_policy_ == CosNotification::AnyOrder ||
this->discard_policy_ == CosNotification::FifoOrder)
{
result = this->msg_queue_.dequeue_head (mb);
}
else if (this->discard_policy_ == CosNotification::LifoOrder)
{
result = this->msg_queue_.dequeue_tail (mb);
}
else if (this->discard_policy_ == CosNotification::DeadlineOrder)
{
result = this->msg_queue_.dequeue_deadline (mb);
}
else if (this->discard_policy_ == CosNotification::PriorityOrder)
{
result = this->msg_queue_.dequeue_prio (mb);
}
else
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"Invalid discard policy\n"));
result = -1;
}
if (result != -1)
ACE_Message_Block::release (mb);
return result;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -