consumer_client.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 270 行
CPP
270 行
// Consumer_Client.cpp,v 1.6 2003/11/02 23:27:22 dhinton Exp
#include "Consumer_Client.h"
#include "Consumer.h"
#include "ORB_Run_Task.h"
#include "ace/Arg_Shifter.h"
#include "orbsvcs/orbsvcs/NotifyExtC.h"
#include "orbsvcs/orbsvcs/CosNamingC.h"
#include "tao/ORB_Core.h"
#include "ace/Sched_Params.h"
#include "ace/OS_NS_errno.h"
ACE_RCSID (Notify, TAO_Notify_Lanes_Consumer_Client, "Consumer_Client.cpp,v 1.6 2003/11/02 23:27:22 dhinton Exp")
TAO_Notify_Lanes_Consumer_Client::TAO_Notify_Lanes_Consumer_Client (TAO_Notify_ORB_Objects& orb_objects)
: orb_objects_ (orb_objects)
, lane_priority_ (0)
, consumer_ (0)
{
}
TAO_Notify_Lanes_Consumer_Client::~TAO_Notify_Lanes_Consumer_Client ()
{
}
int
TAO_Notify_Lanes_Consumer_Client::parse_args (int argc, char *argv[])
{
ACE_Arg_Shifter arg_shifter (argc, argv);
const ACE_TCHAR *current_arg = 0;
while (arg_shifter.is_anything_left ())
{
if ((current_arg = arg_shifter.get_the_parameter (ACE_LIB_TEXT("-LanePriority")))) // LanePriority
{
if (current_arg != 0)
{
this->lane_priority_ = ACE_OS::atoi (current_arg);
char type[BUFSIZ];
ACE_OS::sprintf (type, "TEST_TYPE_%d", this->lane_priority_);
this->event_type_ = type;
}
arg_shifter.consume_arg ();
}
else
{
arg_shifter.ignore_arg ();
}
}
return 0;
}
void
TAO_Notify_Lanes_Consumer_Client::initialize (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG, "(%P, %t)Initializing Consumer Client with lane priority = %d, event type = (%s)\n"
, this->lane_priority_, this->event_type_.c_str ()));
PortableServer::POAManager_var poa_manager =
this->orb_objects_.root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// Resolve the Notification Factory.
CosNotifyChannelAdmin::EventChannelFactory_var ecf = this->orb_objects_.notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// Find the EventChannel created by the supplier.
CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq = ecf->get_all_channels (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
CosNotifyChannelAdmin::EventChannel_var ec;
if (channel_seq->length() > 0)
{
ec = ecf->get_event_channel (channel_seq[0] ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
else
{
ACE_DEBUG ((LM_DEBUG, "No Event Channel active!\n"));
return;
}
// Create a Consumer Admin
CosNotifyChannelAdmin::AdminID adminid = 0;
CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin =
ec->new_for_consumers (CosNotifyChannelAdmin::AND_OP, adminid ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_ASSERT (!CORBA::is_nil (consumer_admin.in ()));
PortableServer::POA_var rt_poa = this->create_rt_poa (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// Create a Consumer
this->consumer_ = new TAO_Notify_Lanes_Consumer (this->orb_objects_);
// Initialize it.
this->consumer_->init (rt_poa, consumer_admin, this->event_type_ ACE_ENV_ARG_PARAMETER);
}
PortableServer::POA_ptr
TAO_Notify_Lanes_Consumer_Client::create_rt_poa (ACE_ENV_SINGLE_ARG_DECL)
{
PortableServer::POA_var rt_poa;
// Create an RT POA with a lane at the given priority.
CORBA::Policy_var priority_model_policy;
CORBA::Policy_var lanes_policy;
CORBA::Policy_var activation_policy =
this->orb_objects_.root_poa_->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (rt_poa._retn ());
// Create a priority model policy.
priority_model_policy =
this->orb_objects_.rt_orb_->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
, 0
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (rt_poa._retn ());
RTCORBA::ThreadpoolLanes lanes (1);
lanes.length (1);
lanes[0].lane_priority = this->lane_priority_;
lanes[0].static_threads = 1;
lanes[0].dynamic_threads = 0;
// Create a thread-pool.
CORBA::ULong stacksize = 0;
CORBA::Boolean allow_request_buffering = 0;
CORBA::ULong max_buffered_requests = 0;
CORBA::ULong max_request_buffer_size = 0;
CORBA::Boolean allow_borrowing = 0;
// Create the thread-pool.
RTCORBA::ThreadpoolId threadpool_id =
this->orb_objects_.rt_orb_->create_threadpool_with_lanes (stacksize,
lanes,
allow_borrowing,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (rt_poa._retn ());
// Create a thread-pool policy.
lanes_policy =
this->orb_objects_.rt_orb_->create_threadpool_policy (threadpool_id
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (rt_poa._retn ());
CORBA::PolicyList poa_policy_list;
poa_policy_list.length (3);
poa_policy_list[0] = priority_model_policy;
poa_policy_list[1] = activation_policy;
poa_policy_list[2] = lanes_policy;
PortableServer::POAManager_var poa_manager =
this->orb_objects_.root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (rt_poa._retn ());
rt_poa = this->orb_objects_.root_poa_->create_POA ("RT POA!",
poa_manager.in (),
poa_policy_list
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (rt_poa._retn ());
return rt_poa._retn ();
}
void
TAO_Notify_Lanes_Consumer_Client::run (ACE_ENV_SINGLE_ARG_DECL)
{
this->consumer_->run (ACE_ENV_SINGLE_ARG_PARAMETER);
}
int
TAO_Notify_Lanes_Consumer_Client::svc (void)
{
ACE_TRY_NEW_ENV
{
// Initialize this threads priority.
this->orb_objects_.current_->the_priority (0 ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->initialize (ACE_ENV_SINGLE_ARG_PARAMETER); //Init the Client
ACE_TRY_CHECK;
this->run (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION,
ACE_TEXT ("Supplier error "));
}
ACE_ENDTRY;
return 0;
}
int
main (int argc, char *argv [])
{
ACE_TRY_NEW_ENV
{
// Initialize an ORB
CORBA::ORB_var orb = CORBA::ORB_init (argc,
argv,
""
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
TAO_Notify_ORB_Objects orb_objects;
orb_objects.init (orb ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
TAO_Notify_ORB_Run_Task orb_run_task (orb_objects);
TAO_Notify_Lanes_Consumer_Client client (orb_objects);
if (client.parse_args (argc, argv) != 0)
{
ACE_DEBUG ((LM_DEBUG, "Consumer_Client::Error parsing options\n"));
return -1;
}
long flags = THR_NEW_LWP | THR_JOINABLE;
flags |=
orb->orb_core ()->orb_params ()->thread_creation_flags ();
if (orb_run_task.activate (flags) == -1 || client.activate (flags) == -1)
{
if (ACE_OS::last_error () == EPERM)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
-1);
else
ACE_DEBUG ((LM_ERROR,
ACE_TEXT ("(%t) Task activation at priority %d failed. \n")));
}
orb_run_task.thr_mgr ()->wait ();
client.thr_mgr ()->wait ();
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION,
ACE_TEXT ("Consumer Client error "));
}
ACE_ENDTRY;
return 0;
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?