📄 echoeventsuppliermain.cpp
字号:
// EchoEventSupplierMain.cpp
// Main program for a PushSupplier of Echo events.
#include "EchoEventSupplier_i.h"
#include "SimpleAddressServer.h"
#include <orbsvcs/RtecEventCommC.h>
#include <orbsvcs/RtecEventChannelAdminC.h>
#include <orbsvcs/Time_Utilities.h>
#include <orbsvcs/Event_Utilities.h>
#include <orbsvcs/CosNamingC.h>
#include <orbsvcs/Event/EC_Event_Channel.h>
#include <orbsvcs/Event/EC_Default_Factory.h>
#include <orbsvcs/Event/ECG_Mcast_EH.h>
#include <orbsvcs/Event/ECG_UDP_Sender.h>
#include <orbsvcs/Event/ECG_UDP_Receiver.h>
#include <orbsvcs/Event/ECG_UDP_Out_Endpoint.h>
#include <orbsvcs/Event/ECG_UDP_EH.h>
#include <tao/ORB_Core.h>
#include <ace/Auto_Ptr.h>
#include <iostream>
#include <fstream>
const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1;
const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1;
const int EVENT_DELAY_MS = 10;
int main (int argc, char* argv[])
{
try
{
// Initialize the EC Factory so we can customize the EC
TAO_EC_Default_Factory::init_svcs ();
// Initialize the ORB.
CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
const char* ecname = "EventService";
const char* address = "localhost";
const char* iorfile = 0;
u_short port = 12345;
u_short listenport = 12345;
int mcast = 1;
for (int i = 0; argv[i] != 0; i++) {
if (strcmp(argv[i], "-ecname") == 0) {
if (argv[i+1] != 0) {
i++;
ecname = argv[i];
} else {
std::cerr << "Missing Event channel name" << std::endl;
}
} else if (strcmp(argv[i], "-address") == 0) {
if (argv[i+1] != 0) {
i++;
address = argv[i];
} else {
std::cerr << "Missing address" << std::endl;
}
} else if (strcmp(argv[i], "-port") == 0) {
if (argv[i+1] != 0) {
i++;
port = ACE_OS::atoi(argv[i]);
} else {
std::cerr << "Missing port" << std::endl;
}
} else if (strcmp(argv[i], "-listenport") == 0) {
if (argv[i+1] != 0) {
i++;
listenport = ACE_OS::atoi(argv[i]);
} else {
std::cerr << "Missing port" << std::endl;
}
} else if (strcmp(argv[i], "-iorfile") == 0) {
if (argv[i+1] != 0) {
i++;
iorfile = argv[i];
}
} else if (strcmp(argv[i], "-udp") == 0) {
mcast = 0;
}
}
// Get the POA
CORBA::Object_var object = orb->resolve_initial_references ("RootPOA");
PortableServer::POA_var poa = PortableServer::POA::_narrow (object.in ());
PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
poa_manager->activate ();
// Create a local event channel and register it
TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ());
TAO_EC_Event_Channel ec_impl (attributes);
ec_impl.activate ();
PortableServer::ObjectId_var oid = poa->activate_object(&ec_impl);
CORBA::Object_var ec_obj = poa->id_to_reference(oid.in());
RtecEventChannelAdmin::EventChannel_var ec =
RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in());
// Find the Naming Service.
CORBA::Object_var obj = orb->resolve_initial_references("NameService");
CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in());
// Bind the Event Channel using Naming Services
CosNaming::Name_var name = root_context->to_name(ecname);
root_context->rebind(name.in(), ec.in());
// Get a proxy push consumer from the EventChannel.
RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers();
RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
admin->obtain_push_consumer();
// Instantiate an EchoEventSupplier_i servant.
EchoEventSupplier_i servant(orb.in());
// Register it with the RootPOA.
oid = poa->activate_object(&servant);
CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
RtecEventComm::PushSupplier_var supplier =
RtecEventComm::PushSupplier::_narrow(supplier_obj.in());
// Connect to the EC.
ACE_SupplierQOS_Factory qos;
qos.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
consumer->connect_push_supplier (supplier.in (), qos.get_SupplierQOS ());
// Initialize the address server with the desired address.
// This will be used by the sender object and the multicast
// receiver.
ACE_INET_Addr send_addr (port, address);
SimpleAddressServer addr_srv_impl (send_addr);
PortableServer::ObjectId_var addr_srv_oid =
poa->activate_object(&addr_srv_impl);
CORBA::Object_var addr_srv_obj = poa->id_to_reference(addr_srv_oid.in());
RtecUDPAdmin::AddrServer_var addr_srv =
RtecUDPAdmin::AddrServer::_narrow(addr_srv_obj.in());
// Create and initialize the sender object
TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender =
TAO_ECG_UDP_Sender::create();
TAO_ECG_UDP_Out_Endpoint endpoint;
if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) {
std::cerr << "Cannot open send endpoint" << std::endl;
return 1;
}
// TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
// If we don't clone our endpoint and pass &endpoint, the sender will
// attempt to delete endpoint during shutdown.
TAO_ECG_UDP_Out_Endpoint* clone;
ACE_NEW_RETURN (clone,
TAO_ECG_UDP_Out_Endpoint (endpoint),
-1);
sender->init (ec.in (), addr_srv.in (), clone);
// Setup the subscription and connect to the EC
ACE_ConsumerQOS_Factory cons_qos_fact;
cons_qos_fact.start_disjunction_group ();
cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0);
RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS ();
sender->connect (sub);
// Create and initialize the receiver
TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver =
TAO_ECG_UDP_Receiver::create();
// TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
// If we don't clone our endpoint and pass &endpoint, the receiver will
// attempt to delete endpoint during shutdown.
ACE_NEW_RETURN (clone,
TAO_ECG_UDP_Out_Endpoint (endpoint),
-1);
receiver->init (ec.in (), clone, addr_srv.in ());
// Setup the registration and connect to the event channel
ACE_SupplierQOS_Factory supp_qos_fact;
supp_qos_fact.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS ();
receiver->connect (pub);
// Create the appropriate event handler and register it with the reactor
auto_ptr<ACE_Event_Handler> eh;
if (mcast) {
auto_ptr<TAO_ECG_Mcast_EH> mcast_eh(new TAO_ECG_Mcast_EH (receiver.in()));
mcast_eh->reactor (orb->orb_core ()->reactor ());
mcast_eh->open (ec.in());
ACE_AUTO_PTR_RESET(eh,mcast_eh.release(),ACE_Event_Handler);
//eh.reset(mcast_eh.release());
} else {
auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in()));
udp_eh->reactor (orb->orb_core ()->reactor ());
ACE_INET_Addr local_addr (listenport);
if (udp_eh->open (local_addr) == -1) {
std::cerr << "Cannot open EH" << std::endl;
}
ACE_AUTO_PTR_RESET(eh,udp_eh.release(),ACE_Event_Handler);
//eh.reset(udp_eh.release());
}
// Create an event (just a string in this case).
const CORBA::String_var eventData = CORBA::string_dup(ecname);
// Create an event set for one event
RtecEventComm::EventSet event (1);
event.length (1);
// Initialize event header.
event[0].header.source = MY_SOURCE_ID;
event[0].header.ttl = 1;
event[0].header.type = MY_EVENT_TYPE;
// Initialize data fields in event.
event[0].data.any_value <<= eventData;
if (iorfile != 0) {
CORBA::String_var str = orb->object_to_string( ec.in() );
std::ofstream iorFile( iorfile );
iorFile << str.in() << std::endl;
iorFile.close();
}
std::cout << "Starting main loop" << std::endl;
const int EVENT_DELAY_MS = 10;
while (1) {
consumer->push (event);
ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS);
orb->run(tv);
}
orb->destroy();
return 0;
}
catch(const CORBA::Exception& exc)
{
std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl;
}
return 1;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -