📄 mcast.cpp
字号:
sender->connect (sub ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// To receive events we need to setup an event handler:
TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
receiver = TAO_ECG_UDP_Receiver::create();
TAO_ECG_Mcast_EH mcast_eh (&*receiver);
// The event handler uses the ORB reactor to wait for multicast
// traffic:
mcast_eh.reactor (orb->orb_core ()->reactor ());
// The multicast Event Handler needs to know to what multicast
// groups it should listen to. To do so it becomes an observer
// with the event channel, to determine the list of events
// required by all the local consumer.
// Then it register for the multicast groups that carry those
// events:
mcast_eh.open (event_channel.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Again the receiver connects to the event channel as a
// supplier of events, using the Observer features to detect
// local consumers and their interests:
receiver->init (event_channel.in (),
endpoint,
address_server.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// The Receiver is also a supplier of events. The exact type of
// events is only known to the application, because it depends
// on the traffic carried by all the multicast groups that the
// different event handlers subscribe to.
// In this example we choose to simply describe our publications
// using wilcards, any event from any source. More advanced
// application could use the Observer features in the event
// channel to update this information (and reduce the number of
// multicast groups that each receive subscribes to).
// In a future version the event channel could perform some of
// those tasks automatically
RtecEventChannelAdmin::SupplierQOS pub;
pub.publications.length (1);
pub.publications[0].event.header.type = ACE_ES_EVENT_ANY;
pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY;
pub.is_gateway = 1;
receiver->connect (pub ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// **************** THAT COMPLETES THE FEDERATION SETUP
// **************** HERE IS THE CLIENT SETUP
// First let us create consumers and connect them to the event
// channel
Consumer consumer1;
Consumer consumer2;
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
consumer1.connect (consumer_admin.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
consumer2.connect (consumer_admin.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// And now create a supplier
Supplier supplier;
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
supplier.connect (supplier_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// **************** THAT COMPLETES THE CLIENT SETUP
// **************** HERE IS THE EVENT LOOP
// creating thread pool
ACE_Thread_Manager the_ace_manager;
the_ace_manager.open ();
int thread_pool_id = the_ace_manager.spawn_n (
pool_size, ACE_THR_FUNC (run_orb_within_thread), 0, THR_DETACHED | THR_NEW_LWP);
if (thread_pool_id == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "Cannot spawn thread pool\n"), 1);
}
ACE_OS::sleep (1); // simple solution ensures ready thread pool
for (int i = 0; i < data_items; i++)
{
supplier.perform_push (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_OS::sleep (2); // simple solution ensures ready receivers
terminate_threads = true; // terminate thread pool
the_ace_manager.wait(); // wait until all threads in the pool are stopped
the_ace_manager.close ();
// **************** THAT COMPLETES THE EVENT LOOP
// **************** HERE IS THE CLEANUP CODE
// First the easy ones
supplier.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
consumer1.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
consumer2.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// Now let us disconnect the Receiver
receiver->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
int r = mcast_eh.shutdown ();
if (r == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Closing MCast event handler\n"), 1);
}
// And also disconnect the sender of events
sender->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// The event channel must be destroyed, so it can release its
// resources, and inform all the clients that are still
// connected that it is going away.
event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// Deactivating the event channel implementation is not strictly
// required, the POA will do it for us, but it is good manners:
{
// Using _this() activates with the default POA, we must gain
// access to that POA to deactivate the object.
// Notice that we 'know' that the default POA for this servant
// is the root POA, but the code is more robust if we don't
// rely on that.
PortableServer::POA_var poa =
ec_impl._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// Get the Object Id used for the servant..
PortableServer::ObjectId_var oid =
poa->servant_to_id (&ec_impl ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Deactivate the object
poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
// Now we can destroy the POA, the flags mean that we want to
// wait until the POA is really destroyed
poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Finally destroy the ORB
orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// **************** THAT COMPLETES THE CLEANUP CODE
ACE_DEBUG ((LM_DEBUG,
"MCast example finished\n"));
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service");
return 1;
}
ACE_ENDTRY;
return 0;
}
// ****************************************************************
int parse_args (int argc, char *argv[])
{
ACE_Get_Opt get_opts (argc, argv, "m:");
int c;
while ((c = get_opts ()) != -1)
switch (c)
{
case 'm':
udp_mcast_address = get_opts.opt_arg ();
break;
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s "
"[-m udp_mcast_address]"
"\n",
argv [0]),
-1);
}
// Indicates sucessful parsing of the command line
return 0;
}
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -