⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 5 页
字号:
// Event_Channel.cpp,v 1.74 2003/10/28 18:34:19 bala Exp

#include "ace/Service_Config.h"
#include "ace/Auto_Ptr.h"
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Event_Utilities.h"

#include "orbsvcs/Event/Dispatching_Modules.h"
#include "orbsvcs/Event/Memory_Pools.h"
#include "orbsvcs/Event/EC_Gateway.h"
#include "orbsvcs/Event/Module_Factory.h"
#include "orbsvcs/Event/Event_Manip.h"
#include "orbsvcs/Event/Event_Channel.h"

#if !defined (__ACE_INLINE__)
#include "Event_Channel.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID(Event, Event_Channel, "Event_Channel.cpp,v 1.74 2003/10/28 18:34:19 bala Exp")

#include "tao/Timeprobe.h"

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Atomic_Op<ACE_ES_MUTEX, int>;
template class ACE_Atomic_Op_Ex<ACE_ES_MUTEX, int>;
template class ACE_Map_Entry<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT>;
template class ACE_Map_Entry<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT>;
template class ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry>;
template class ACE_Map_Manager<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Manager<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator_Base<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Map_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Reverse_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Node<ACE_ES_Consumer_Rep *>;
template class ACE_Node<ACE_Push_Consumer_Proxy *>;
template class ACE_Node<ACE_Push_Supplier_Proxy *>;
template class ACE_Unbounded_Set_Ex<ACE_ES_Consumer_Rep *>;
template class ACE_Unbounded_Set_Ex<ACE_Push_Consumer_Proxy *>;
template class ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *>;
template class ACE_Unbounded_Set_Ex_Iterator<ACE_ES_Consumer_Rep *>;
template class ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Consumer_Proxy *>;
template class ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *>;

template class ACE_Auto_Basic_Ptr<ACE_Push_Supplier_Proxy>;
template class ACE_Auto_Basic_Ptr<ACE_Push_Consumer_Proxy>;
template class auto_ptr<ACE_Push_Supplier_Proxy>;
template class auto_ptr<ACE_Push_Consumer_Proxy>;

template class ACE_Array<TAO_EC_Event>;
template class ACE_Array_Base<TAO_EC_Event>;
template class ACE_Array_Iterator<TAO_EC_Event>;

#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Atomic_Op<ACE_ES_MUTEX, int>
#pragma instantiate ACE_Atomic_Op_Ex<ACE_ES_MUTEX, int>
#pragma instantiate ACE_Map_Entry<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT>
#pragma instantiate ACE_Map_Entry<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT>
#pragma instantiate ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry>
#pragma instantiate ACE_Map_Manager<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Manager<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator_Base<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Node<ACE_ES_Consumer_Rep *>
#pragma instantiate ACE_Node<ACE_Push_Consumer_Proxy *>
#pragma instantiate ACE_Node<ACE_Push_Supplier_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex<ACE_ES_Consumer_Rep *>
#pragma instantiate ACE_Unbounded_Set_Ex<ACE_Push_Consumer_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex_Iterator<ACE_ES_Consumer_Rep *>
#pragma instantiate ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Consumer_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *>

#pragma instantiate ACE_Auto_Basic_Ptr<ACE_Push_Supplier_Proxy>
#pragma instantiate ACE_Auto_Basic_Ptr<ACE_Push_Consumer_Proxy>
#pragma instantiate auto_ptr<ACE_Push_Supplier_Proxy>
#pragma instantiate auto_ptr<ACE_Push_Consumer_Proxy>

#pragma instantiate ACE_Array<TAO_EC_Event>
#pragma instantiate ACE_Array_Base<TAO_EC_Event>
#pragma instantiate ACE_Array_Iterator<TAO_EC_Event>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#if defined (ACE_ENABLE_TIMEPROBES)

static const char *TAO_Event_Channel_Timeprobe_Description[] =
{
  "Preemption_Priority - priority requested",
  "connected - priority obtained",
  "enter Push_Supplier_Proxy::push",
  "enter ES_Consumer_Module::push",
  "leave ES_Consumer_Module::push",
  "enter ACE_ES_Correlation_Module::push",
  "pushed to Correlation_Module",
  "push_source_type: Dispatch Module enqueuing",
  "ACE_ES_Consumer_Correlation::push, enter",
  "Consumer_Correlation::push, determine NO CORR.",
  "Consumer_Correlation::push, NO_CORR: alloc",
  "Consumer_Rep_Timeout::execute",
  "deliver to Subscription Module",
  "begin push_source_type",
  "end push_source_type",
  "deliver to Supplier Module (thru Supplier Proxy)",
  "connected - priority requested",
  "Consumer_Name - priority requested",
  "Consumer_Name - priority obtained",
  "deliver event to consumer proxy",
  "enter ACE_ES_Subscription_Module::push",
  "push_source_type"
};

enum
{
  // Timeprobe description table start key
  TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED = 5100,
  TAO_EVENT_CHANNEL_CONNECTED_PRIORITY_OBTAINED,
  TAO_EVENT_CHANNEL_ENTER_PUSH_SUPPLIER_PROXY_PUSH,
  TAO_EVENT_CHANNEL_ENTER_ES_CONSUMER_MODULE_PUSH,
  TAO_EVENT_CHANNEL_LEAVE_ES_CONSUMER_MODULE_PUSH,
  TAO_EVENT_CHANNEL_ENTER_ACE_ES_CORRELATION_MODULE_PUSH,
  TAO_EVENT_CHANNEL_PUSHED_TO_CORRELATION_MODULE,
  TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE_DISPATCH_MODULE_ENQUEUING,
  TAO_EVENT_CHANNEL_ACE_ES_CONSUMER_CORRELATION_PUSH_ENTER,
  TAO_EVENT_CHANNEL_CONSUMER_CORRELATION_PUSH_DETERMINE_NO_CORR,
  TAO_EVENT_CHANNEL_CONSUMER_CORRELATION_PUSH_NO_CORR_ALLOC,
  TAO_EVENT_CHANNEL_CONSUMER_REP_TIMEOUT_EXECUTE,
  TAO_EVENT_CHANNEL_DELIVER_TO_SUBSCRIPTION_MODULE,
  TAO_EVENT_CHANNEL_BEGIN_PUSH_SOURCE_TYPE,
  TAO_EVENT_CHANNEL_END_PUSH_SOURCE_TYPE,
  TAO_EVENT_CHANNEL_DELIVER_TO_SUPPLIER_MODULE_THRU_SUPPLIER_PROXY,
  TAO_EVENT_CHANNEL_CONNECTED_PRIORITY_REQUESTED,
  TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_REQUESTED,
  TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_OBTAINED,
  TAO_EVENT_CHANNEL_DELIVER_EVENT_TO_CONSUMER_PROXY,
  TAO_EVENT_CHANNEL_ENTER_ACE_ES_SUBSCRIPTION_MODULE_PUSH,
  TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE
};

// Setup Timeprobes
ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Event_Channel_Timeprobe_Description,
                                  TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED);

#endif /* ACE_ENABLE_TIMEPROBES */

static RtecScheduler::Preemption_Priority_t
Preemption_Priority (RtecScheduler::Scheduler_ptr scheduler,
                     RtecScheduler::handle_t rtinfo
                     ACE_ENV_ARG_DECL)
{
  RtecScheduler::OS_Priority thread_priority;
  RtecScheduler::Preemption_Subpriority_t subpriority;
  RtecScheduler::Preemption_Priority_t preemption_priority;

  ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED);

#if 1
  scheduler->priority
    (rtinfo,
     thread_priority,
     subpriority,
     preemption_priority
      ACE_ENV_ARG_PARAMETER);
#else
  ACE_Scheduler_Factory::server ()->priority
    (rtinfo,
     thread_priority,
     subpriority,
     preemption_priority
      ACE_ENV_ARG_PARAMETER);
#endif
  ACE_CHECK_RETURN (0);
  return preemption_priority;
}

static RtecScheduler::OS_Priority
IntervalToPriority (RtecScheduler::Time interval)
{
  for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
    if (interval <= ACE_Scheduler_Rates[x])
      return x;

  return ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
}

class TAO_RTOLDEvent_Export Shutdown_Consumer : public ACE_ES_Dispatch_Request
{
  // = TITLE
  //    Shutdown Consumer command
  //
  // = DESCRIPTION
  //    This command object is sent through the system when a consumer
  //    disconnects.  When the Dispatching Module dequeues this request,
  //    it calls execute which execute calls back to the Consumer
  //    Module.  At that point, the Consumer Module can tell the rest of
  //    the system that the consumer has disconnected and delete the
  //    consumer proxy.  This allows all events queued for the consumer
  //    to be flushed to the consumer proxy (which will drop them).
  //    Events can be queued in the ReactorEx (in a dispatch set), or in
  //    the Dispatching Module.
public:
  // When executed, tells <consumer_module> that <consumer> has shut
  // down.
  Shutdown_Consumer (ACE_ES_Consumer_Module *consumer_module,
                     ACE_Push_Consumer_Proxy *consumer,
                     RtecScheduler::Scheduler_ptr scheduler)
    : consumer_module_ (consumer_module)
  {
    consumer_ = consumer;

    // Set rt_info_ to the lowest priority rt_info in consumer_.
    // This is so the dispatching module can query us as a dispatch
    // request to get the appropriate preemption priority.
    ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies);

    RtecScheduler::Preemption_Priority_t p =
      ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
    while (iter.advance_dependency () == 0)
      {
        ACE_DECLARE_NEW_CORBA_ENV;
        ACE_TRY
          {
            RtecEventComm::EventType &type = (*iter).event.header.type;

            if (type != ACE_ES_GLOBAL_DESIGNATOR &&
                type != ACE_ES_CONJUNCTION_DESIGNATOR &&
                type != ACE_ES_DISJUNCTION_DESIGNATOR)
              {
                RtecScheduler::Preemption_Priority_t q =
                  ::Preemption_Priority (scheduler, (*iter).rt_info
                                          ACE_ENV_ARG_PARAMETER);
                ACE_TRY_CHECK;
                if (rt_info_ == 0 || q < p)
                  {
                    this->rt_info_ = ((*iter).rt_info);
                    p = q;
                  }
              }
          }
        ACE_CATCHANY
          {
            // Ignore exceptions...
          }
        ACE_ENDTRY;
      }
  }

  // Report to the consumer module that consumer_ has shutdown.
  virtual int execute (u_long &command_action)
  {
    consumer_module_->shutdown_request (this);
    command_action = ACE_RT_Task_Command::RELEASE;
    return 0;
  }

#if 0
  // @@ Memory allocators
  void *operator new (size_t /* nbytes */)
  { return ::new char[sizeof (Shutdown_Consumer)]; }

  void operator delete (void *buf)
  { ::delete [] ACE_static_cast(char*,buf); }
#endif /* 0 */

  // The module that we report to.
  ACE_ES_Consumer_Module *consumer_module_;
};

class TAO_RTOLDEvent_Export Shutdown_Channel : public ACE_ES_Dispatch_Request
{
public:
  Shutdown_Channel (ACE_EventChannel *channel) :
    channel_ (channel) {}

  // Report to the consumer module that consumer_ has shutdown.
  virtual int execute (u_long &command_action)
    {
#if 0
      channel_->destroy_i ();
#endif
      command_action = ACE_RT_Task_Command::RELEASE;
      return 0;
    }

#if 0
  // @@ Memory allocators
  void *operator new (size_t /* nbytes */)
    { return ::new char[sizeof (Shutdown_Channel)]; }

  void operator delete (void *buf)
    { ::delete [] ACE_static_cast(char*,buf); }
#endif

  ACE_EventChannel *channel_;
};

class TAO_RTOLDEvent_Export Flush_Queue_ACT : public ACE_Command_Base
{
  // = TITLE
  //    Flush Queue Asynchronous Completion Token
  //
  // = DESCRIPTION
  //    Carries a single dispatch request through the ReactorEx.
  //    Deletes itself when execute is called.
public:
  Flush_Queue_ACT (ACE_ES_Dispatch_Request *request,
                   ACE_ES_Dispatching_Module *dispatching_module) :
    request_ (request),
    dispatching_module_ (dispatching_module) { }

  virtual int execute (void* /* arg = 0 */)
  {
    ACE_DECLARE_NEW_CORBA_ENV;
    ACE_TRY
      {
        ACE_ES_Dispatch_Request *request = request_;
        dispatching_module_->push (request ACE_ENV_ARG_PARAMETER);
        ACE_TRY_CHECK;
        delete this;
      }
    ACE_CATCHANY
      {
        ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                             "Flush_Queue_ACT::execute, "
                             "unexpected exception");
      }
    ACE_ENDTRY;
    return 0;
  }

  ACE_ES_Dispatch_Request *request_;
  ACE_ES_Dispatching_Module *dispatching_module_;
};

void
dump_event (const RtecEventComm::Event &event)
{
  ACE_DEBUG ((LM_DEBUG, "source_ = %ld "
              "type_ = %d "
              "time_ = %u.\n",
              event.header.source,
              event.header.type,
              // The divide-by-1 is for ACE_U_LongLong support.
              ORBSVCS_Time::to_hrtime (event.header.creation_time) / 1));
}

ACE_Push_Supplier_Proxy::ACE_Push_Supplier_Proxy (ACE_ES_Supplier_Module *sm)
  : supplier_module_ (sm),
    push_supplier_ (0)
{
}

void
ACE_Push_Supplier_Proxy::connect_push_supplier (
    RtecEventComm::PushSupplier_ptr push_supplier,
    const RtecEventChannelAdmin::SupplierQOS &qos
    ACE_ENV_ARG_DECL)
        ACE_THROW_SPEC ((CORBA::SystemException,
                         RtecEventChannelAdmin::AlreadyConnected))
{
  if (this->connected ())
    ACE_THROW (RtecEventChannelAdmin::AlreadyConnected());

  this->push_supplier_ =
    RtecEventComm::PushSupplier::_duplicate(push_supplier);

  //ACE_DEBUG ((LM_DEBUG, "EC (%t) connect_push_supplier QOS is "));
  //ACE_SupplierQOS_Factory::debug (qos);

  // Copy by value.
  this->qos_ = qos;

  // ACE_SupplierQOS_Factory::debug (qos_);

  // @@ TODO: The SupplierQOS should have a more reasonable interface to
  // obtain the supplier_id(), BTW, a callback to push_supplier will
  // not work: it usually results in some form of dead-lock.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -