⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencepushconsumer.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// SequencePushConsumer.cpp,v 1.14 2003/09/04 03:27:24 ossama Exp

#include "SequencePushConsumer.h"

#if ! defined (__ACE_INLINE__)
#include "SequencePushConsumer.inl"
#endif /* __ACE_INLINE__ */

ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "SequencePushConsumer.cpp,v 1.14 2003/09/04 03:27:24 ossama Exp")

#include "ace/Reactor.h"
#include "tao/debug.h"
#include "../QoSProperties.h"
#include "../ProxySupplier.h"
#include "../Worker_Task.h"
#include "../Consumer.h"
#include "../Method_Request_Event.h"
#include "../Timer.h"
#include "../Proxy.h"

TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
  : TAO_Notify_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0),
    max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0)
{
}

TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
{
  delete this->buffering_strategy_;
}

void
TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL)
{
  this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);

  this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);

  ACE_NEW_THROW_EX (this->buffering_strategy_,
                    TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties,
                                                     this->max_batch_size_.value ()),
                    CORBA::NO_MEMORY ());

  this->timer_ = this->proxy ()->timer ();
}

void
TAO_Notify_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
  this->cancel_timer ();
  this->timer_->_decr_refcnt ();
}

void
TAO_Notify_SequencePushConsumer::release (void)
{
  delete this;
  //@@ inform factory
}

void
TAO_Notify_SequencePushConsumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
{
  this->max_batch_size_ = qos_properties.maximum_batch_size ();

  if (this->max_batch_size_.is_valid ())
    {// set the max batch size.
      this->buffering_strategy_->batch_size (this->max_batch_size_.value ());
    }

  const TAO_Notify_Property_Time &pacing_interval = qos_properties.pacing_interval ();

  if (pacing_interval.is_valid ())
    {
      this->pacing_interval_ =
# if defined (ACE_CONFIG_WIN32_H)
        ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ()));
# else
      ACE_Time_Value (pacing_interval.value () / 1);
# endif /* ACE_CONFIG_WIN32_H */
    }

  // Inform the buffering strategy of qos change.
  this->buffering_strategy_->update_qos_properties (qos_properties);
}

void
TAO_Notify_SequencePushConsumer::schedule_timer (void)
{
  // Schedule the timer.
  if (this->pacing_interval_ != ACE_Time_Value::zero)
    {
      this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0);

      if (this->timer_id_ == -1)
        this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing.
    }
}

void
TAO_Notify_SequencePushConsumer::cancel_timer (void)
{
  timer_->cancel_timer (this->timer_id_);
}

void
TAO_Notify_SequencePushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
{
  TAO_Notify_Event* copy = event->copy (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  TAO_Notify_Event_Copy_var copy_var (copy);

  this->push_i (copy_var ACE_ENV_ARG_PARAMETER);
}

void
TAO_Notify_SequencePushConsumer::push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL)
{
  TAO_Notify_Method_Request_Event* method_request;

  ACE_NEW_THROW_EX (method_request,
                    TAO_Notify_Method_Request_Event (event),
                    CORBA::NO_MEMORY ());

  int msg_count = this->buffering_strategy_->enqueue (*method_request);

  if (msg_count == -1)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - "
                    "failed to enqueue\n"));
      return;
    }

  if (this->pacing_interval_ == ACE_Time_Value::zero)
    {
      // If pacing is zero, there is no timer, hence dispatch immediately
      this->handle_timeout (ACE_Time_Value::zero, 0);
    }
  else if (msg_count == 1)
    this->schedule_timer ();
}

void
TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED)
{
  //NOP
}

void
TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& /*notification*/ ACE_ENV_ARG_DECL_NOT_USED)
{
  //NOP
}

int
TAO_Notify_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/,
                                             const void* /*act*/)
{
  CosNotification::EventBatch event_batch;

  int pending = 0;

  int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending);

  if (deq_count > 0)
    {
      TAO_Notify_Proxy_Guard ref_guard(this->proxy ()); // Protect this object from being destroyed in this scope.

      this->push (event_batch);

      if (pending)
        this->schedule_timer ();
    }

  return 0;
}

void
TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
{
  ACE_TRY_NEW_ENV
    {
      this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // we're scheduled to be destroyed. don't set the timer.
      this->pacing_interval_ = ACE_Time_Value::zero;
    }
  ACE_ENDTRY;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -