pong.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 374 行
CPP
374 行
// pong.cpp,v 1.19 2003/10/28 20:09:35 bala Exp
#include "pong.h"
#include "orbsvcs/AV/Protocol_Factory.h"
#include "tao/PortableServer/PortableServer.h"
#include "tao/Strategies/advanced_resource.h"
#include "tao/ORB.h"
#include "tao/debug.h"
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
#include "ace/Stats.h"
ACE_RCSID(Latency, ping, "pong.cpp,v 1.19 2003/10/28 20:09:35 bala Exp")
const char *ior_output_file = "pong.ior";
const char *protocol = "RTP/UDP";
int milliseconds = 100;
size_t message_size = 64;
int respond = 1;
AVStreams::protocolSpec pong_protocols;
AVStreams::protocolSpec ping_protocols;
ACE_hrtime_t recv_throughput_base = 0;
ACE_Throughput_Stats recv_latency;
ACE_hrtime_t send_throughput_base = 0;
ACE_Throughput_Stats send_latency;
int
parse_args (int argc, char *argv[])
{
ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:d");
int c;
while ((c = get_opts ()) != -1)
switch (c)
{
case 'o':
ior_output_file = get_opts.opt_arg ();
break;
case 'r':
{
CORBA::ULong l = ping_protocols.length ();
ping_protocols.length (l + 1);
ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
}
break;
case 's':
{
CORBA::ULong l = pong_protocols.length ();
pong_protocols.length (l + 1);
pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
}
break;
case 't':
milliseconds = ACE_OS::atoi (get_opts.opt_arg ());
break;
case 'b':
message_size = ACE_OS::atoi (get_opts.opt_arg ());
if (message_size < sizeof(ACE_hrtime_t))
{
ACE_DEBUG ((LM_DEBUG, "Invalid message size\n"));
message_size = 64;
}
break;
case 'x':
respond = 0;
break;
case 'd':
TAO_debug_level++;
break;
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s "
"-o <iorfile> "
"-r <protocol=addr> "
"-s <protocol=addr> "
"-t <milliseconds> "
"\n",
argv [0]),
-1);
}
// If no protocols are specified use the default...
if (pong_protocols.length () == 0)
{
pong_protocols.length (1);
pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456");
}
if (ping_protocols.length () == 0)
{
ping_protocols.length (1);
ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345");
}
// Indicates sucessful parsing of the command line
return 0;
}
int main (int argc, char *argv[])
{
ACE_TRY_NEW_ENV
{
CORBA::ORB_var orb = CORBA::ORB_init (argc,
argv);
parse_args (argc, argv);
CORBA::Object_var obj
= orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
PortableServer::POA_var poa
= PortableServer::POA::_narrow (obj.in ());
PortableServer::POAManager_var mgr
= poa->the_POAManager ();
mgr->activate ();
TAO_AV_CORE::instance ()->init (orb.in (),
poa.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Reactive_Strategy *reactive_strategy;
ACE_NEW_RETURN (reactive_strategy,
Reactive_Strategy,
1);
reactive_strategy->init (orb.in (), poa.in ());
TAO_MMDevice *mmdevice_impl;
ACE_NEW_RETURN (mmdevice_impl,
TAO_MMDevice (reactive_strategy),
1);
AVStreams::MMDevice_var mmdevice =
mmdevice_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
CORBA::String_var ior =
orb->object_to_string (mmdevice.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ()));
// If the ior_output_file exists, output the ior to it
if (ior_output_file != 0)
{
FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
if (output_file == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Cannot open output file for writing IOR: %s",
ior_output_file),
1);
ACE_OS::fprintf (output_file, "%s", ior.in ());
ACE_OS::fclose (output_file);
}
Pong_Recv_FDev* pong_fdev_impl;
ACE_NEW_RETURN (pong_fdev_impl,
Pong_Recv_FDev ("Pong"),
1);
Ping_Send_FDev* ping_fdev_impl;
ACE_NEW_RETURN (ping_fdev_impl,
Ping_Send_FDev ("Ping"),
1);
AVStreams::FDev_var ping_fdev =
ping_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
AVStreams::FDev_var pong_fdev =
pong_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
mmdevice->add_fdev (ping_fdev.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (respond == 1)
{
mmdevice->add_fdev (pong_fdev.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
orb->run ( ACE_ENV_SINGLE_ARG_PARAMETER );
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . "));
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
ACE_DEBUG ((LM_DEBUG, "done\n"));
recv_latency.dump_results ("Receive", gsf);
send_latency.dump_results ("Send", gsf);
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Caught exception:");
return 1;
}
ACE_ENDTRY;
return 0;
}
// ****************************************************************
Pong_Recv::Pong_Recv (void)
: TAO_FlowConsumer ("Pong",
pong_protocols,
"UNS:pong")
{
}
int
Pong_Recv::get_callback (const char *,
TAO_AV_Callback *&callback)
{
// ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
callback = &this->callback_;
return 0;
}
int
Pong_Recv_Callback::handle_stop (void)
{
// ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
TAO_AV_CORE::instance ()->orb ()->shutdown ();
return 0;
}
int
Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame,
TAO_AV_frame_info *,
const ACE_Addr &)
{
// ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
ACE_hrtime_t now = ACE_OS::gethrtime ();
for (const ACE_Message_Block *i = frame;
i != 0;
i = frame->cont ())
{
ACE_hrtime_t buf[2];
if (frame->length () < sizeof(buf))
{
ACE_DEBUG ((LM_DEBUG, "Unexpected message size\n"));
return 0;
}
ACE_OS::memcpy (buf, i->rd_ptr (), sizeof(buf));
if (recv_throughput_base == 0)
{
recv_throughput_base = now;
}
recv_latency.sample (now - recv_throughput_base,
now - buf[0]);
}
return 0;
}
int
Pong_Recv_Callback::handle_destroy (void)
{
ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::destroy\n"));
return 0;
}
// ****************************************************************
Ping_Send::Ping_Send (void)
: TAO_FlowProducer ("Ping",
ping_protocols,
"UNS:ping")
{
}
int
Ping_Send::get_callback (const char *,
TAO_AV_Callback *&callback)
{
// ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
callback = &this->callback_;
return 0;
}
Ping_Send_Callback::Ping_Send_Callback (void)
:count_ (0)
{
this->timeout_ = ACE_Time_Value (2);
this->frame_.size (message_size);
this->frame_.wr_ptr (message_size);
}
void
Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv,
void *&)
{
tv = &this->timeout_;
}
int
Ping_Send_Callback::handle_timeout (void *)
{
this->count_++;
ACE_DEBUG ((LM_DEBUG, "Ping timeout frame %d\n", this->count_));
if (this->count_ > 10)
{
TAO_AV_CORE::instance ()->orb ()->shutdown ();
return 0;
}
ACE_hrtime_t stamp = ACE_OS::gethrtime ();
ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp));
int result = this->protocol_object_->send_frame (&this->frame_);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Ping_Send_Callback::handle_timeout - send_frame - %p\n",
""),
-1);
if (send_throughput_base == 0)
{
send_throughput_base = stamp;
}
ACE_hrtime_t now = ACE_OS::gethrtime ();
send_latency.sample (now - send_throughput_base,
now - stamp);
return 0;
}
int
Ping_Send_Callback::handle_end_stream (void)
{
return 0;
}
// ****************************************************************
// @@ TODO
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class TAO_AV_Endpoint_Reactive_Strategy_A<TAO_StreamEndPoint_A, TAO_VDev, AV_Null_MediaCtrl>;
template class TAO_AV_Endpoint_Reactive_Strategy<TAO_StreamEndPoint_A, TAO_VDev, AV_Null_MediaCtrl>;
template class TAO_FDev<TAO_FlowProducer, Pong_Recv>;
template class TAO_FDev<Ping_Send, TAO_FlowConsumer>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<TAO_StreamEndPoint_A, TAO_VDev, AV_Null_MediaCtrl>
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<TAO_StreamEndpoint_A, TAO_VDev, AV_Null_MediaCtrl>
#pragma instantiate TAO_FDev<TAO_FlowProducer, Ping_Recv>
#pragma instantiate TAO_FDev<Pong_Send, TAO_FlowConsumer>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?