client.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 429 行
CPP
429 行
// client.cpp,v 1.2 2003/10/08 13:26:32 venkita Exp
#include "ace/Get_Opt.h"
#include "ace/Task.h"
#include "ace/High_Res_Timer.h"
#include "tao/RTScheduling/RTScheduler_Manager.h"
#include "testC.h"
#include "MUF_Scheduler.h"
#include "orbsvcs/orbsvcs/Time_Utilities.h"
ACE_RCSID(MT_Server, client, "client.cpp,v 1.2 2003/10/08 13:26:32 venkita Exp")
const char *ior = "file://test.ior";
int niterations = 5;
int do_shutdown = 0;
int enable_dynamic_scheduling = 0;
int enable_yield = 1;
class Worker : public ACE_Task_Base
{
// = TITLE
// Run a server thread
//
// = DESCRIPTION
// Use the ACE_Task_Base class to run server threads
//
public:
Worker (CORBA::ORB_ptr orb,
Simple_Server_ptr server_ptr,
RTScheduling::Current_ptr current,
MUF_Scheduler* scheduler,
TimeBase::TimeT deadline,
TimeBase::TimeT estimated_initial_execution_time,
long criticality,
CORBA::Long server_load);
// int sleep_time);
// ctor
virtual int svc (void);
// The thread entry point.
private:
CORBA::ORB_var orb_;
// The orb
Simple_Server_var server_;
RTScheduling::Current_var scheduler_current_;
MUF_Scheduler* scheduler_;
TimeBase::TimeT deadline_;
TimeBase::TimeT estimated_initial_execution_time_;
long criticality_;
CORBA::Long server_load_;
int sleep_time_;
};
int
parse_args (int argc, char *argv[])
{
ACE_Get_Opt get_opts (argc, argv, "xk:i:ds");
int c;
while ((c = get_opts ()) != -1)
switch (c)
{
case 'x':
do_shutdown = 1;
break;
case 'k':
ior = get_opts.opt_arg ();
break;
case 'i':
niterations = ACE_OS::atoi (get_opts.opt_arg ());
break;
case 'd':
enable_dynamic_scheduling = 1;
break;
case 's':
enable_yield = 0;
break;
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s "
"-k <ior> "
"-i <niterations> "
"-d (enable dynamic scheduling)"
"-s (disable yield)"
"\n",
argv [0]),
-1);
}
// Indicates sucessful parsing of the command line
return 0;
}
int
main (int argc, char *argv[])
{
MUF_Scheduler* scheduler=0;
RTScheduling::Current_var current;
int prio;
int max_prio;
ACE_Sched_Params::Policy sched_policy = ACE_SCHED_RR;
int sched_scope = ACE_SCOPE_THREAD;
long flags;
if (sched_policy == ACE_SCHED_RR)
flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_RR;
else
flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_FIFO;
ACE_hthread_t main_thr_handle;
ACE_Thread::self (main_thr_handle);
max_prio = ACE_Sched_Params::priority_max (sched_policy,
sched_scope);
ACE_Sched_Params sched_params (sched_policy, max_prio);
ACE_OS::sched_params (sched_params);
if (ACE_Thread::getprio (main_thr_handle, prio) == -1)
{
if (errno == ENOTSUP)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT ("getprio not supported\n")
));
}
else
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p\n")
ACE_TEXT ("thr_getprio failed")));
}
}
ACE_DEBUG ((LM_DEBUG, "(%t): main thread prio is %d\n", prio));
ACE_TRY_NEW_ENV
{
CORBA::ORB_var orb =
CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (parse_args (argc, argv) != 0)
return 1;
CORBA::Object_var object =
orb->string_to_object (ior ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Simple_Server_var server =
Simple_Server::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (server.in ()))
{
ACE_ERROR_RETURN ((LM_ERROR,
"Object reference <%s> is nil\n",
ior),
1);
}
if (enable_dynamic_scheduling)
{
ACE_DEBUG ((LM_DEBUG, "Dyn Sched enabled\n"));
CORBA::Object_ptr manager_obj =
orb->resolve_initial_references ("RTSchedulerManager"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
TAO_RTScheduler_Manager_var manager =
TAO_RTScheduler_Manager::_narrow (manager_obj
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type;
if (enable_yield)
{
disp_impl_type = Kokyu::DSRT_CV_BASED;
}
else
{
disp_impl_type = Kokyu::DSRT_OS_BASED;
}
ACE_NEW_RETURN (scheduler,
MUF_Scheduler (orb.in (),
disp_impl_type,
sched_policy,
sched_scope), -1);
manager->rtscheduler (scheduler);
CORBA::Object_var object =
orb->resolve_initial_references ("RTScheduler_Current"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
current =
RTScheduling::Current::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
TimeBase::TimeT deadline;
TimeBase::TimeT exec_time;
int criticality=0;
ORBSVCS_Time::Time_Value_to_TimeT (deadline,
ACE_OS::gettimeofday () +
ACE_Time_Value (50,0) );
ORBSVCS_Time::Time_Value_to_TimeT (exec_time,
ACE_OS::gettimeofday () +
ACE_Time_Value (10,0) );
Worker worker1 (orb.in (),
server.in (),
current.in (),
scheduler,
deadline,
exec_time,
criticality,
30);
if (worker1.activate (flags, 1, 0, max_prio) != 0)
{
ACE_ERROR ((LM_ERROR,
"(%t|%T) cannot activate worker thread.\n"));
}
ACE_OS::sleep(2);
ORBSVCS_Time::Time_Value_to_TimeT (deadline,
ACE_OS::gettimeofday () +
ACE_Time_Value (30,0) );
ORBSVCS_Time::Time_Value_to_TimeT (exec_time,
ACE_OS::gettimeofday () +
ACE_Time_Value (10,0) );
criticality = 0;
Worker worker2 (orb.in (),
server.in (),
current.in (),
scheduler,
deadline,
exec_time,
criticality,
10);
if (worker2.activate (flags, 1, 0, max_prio) != 0)
{
ACE_ERROR ((LM_ERROR,
"(%t|%T) cannot activate scheduler thread in RT mode.\n"));
}
ORBSVCS_Time::Time_Value_to_TimeT (deadline,
ACE_OS::gettimeofday () +
ACE_Time_Value (100,0) );
ORBSVCS_Time::Time_Value_to_TimeT (exec_time,
ACE_OS::gettimeofday () +
ACE_Time_Value (10,0) );
criticality = 1;
Worker worker3 (orb.in (),
server.in (),
current.in (),
scheduler,
deadline,
exec_time,
criticality,
5);
if (worker3.activate (flags, 1, 0, max_prio) != 0)
{
ACE_ERROR ((LM_ERROR,
"(%t|%T) cannot activate scheduler thread in RT mode.\n"));
}
worker1.wait ();
worker2.wait ();
worker3.wait ();
ACE_DEBUG ((LM_DEBUG,
"(%t): wait for worker threads done in main thread\n"));
if (do_shutdown)
{
if (enable_dynamic_scheduling)
{
MUF_Scheduling::SchedulingParameter sched_param;
sched_param.criticality = 0;
sched_param.deadline = 0;
sched_param.estimated_initial_execution_time = 0;
CORBA::Policy_var sched_param_policy =
scheduler->create_scheduling_parameter (sched_param);
CORBA::Policy_ptr implicit_sched_param = 0;
current->begin_scheduling_segment (0,
sched_param_policy.in (),
implicit_sched_param
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_DEBUG ((LM_DEBUG, "(%t): about to call server shutdown\n"));
server->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "after shutdown call in main thread\n"));
if (enable_dynamic_scheduling)
{
current->end_scheduling_segment (0 ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
scheduler->shutdown ();
ACE_DEBUG ((LM_DEBUG, "scheduler shutdown done\n"));
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception caught:");
return 1;
}
ACE_ENDTRY;
return 0;
}
// ****************************************************************
Worker::Worker (CORBA::ORB_ptr orb,
Simple_Server_ptr server_ptr,
RTScheduling::Current_ptr current,
MUF_Scheduler* scheduler,
TimeBase::TimeT deadline,
TimeBase::TimeT estimated_initial_execution_time,
long criticality,
CORBA::Long server_load)
// int sleep_time)
: orb_ (CORBA::ORB::_duplicate (orb)),
server_ (Simple_Server::_duplicate (server_ptr)),
scheduler_current_ (RTScheduling::Current::_duplicate (current)),
scheduler_ (scheduler),
deadline_ (deadline),
estimated_initial_execution_time_ ( estimated_initial_execution_time),
criticality_ (criticality),
server_load_ (server_load)
// sleep_time_ (sleep_time)
{
}
int
Worker::svc (void)
{
ACE_DECLARE_NEW_CORBA_ENV;
const char * name = 0;
/*
ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to sleep for %d sec\n", sleep_time_));
ACE_OS::sleep (sleep_time_);
ACE_DEBUG ((LM_DEBUG, "(%t|%T):woke up from sleep for %d sec\n", sleep_time_));
*/
ACE_hthread_t thr_handle;
ACE_Thread::self (thr_handle);
int prio;
if (ACE_Thread::getprio (thr_handle, prio) == -1)
{
if (errno == ENOTSUP)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT ("getprio not supported\n")
));
}
else
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p\n")
ACE_TEXT ("thr_getprio failed")));
}
}
ACE_DEBUG ((LM_DEBUG, "(%t|%T) worker activated with prio %d\n", prio));
if (enable_dynamic_scheduling)
{
MUF_Scheduling::SchedulingParameter sched_param;
CORBA::Policy_var sched_param_policy;
sched_param.criticality = criticality_;
sched_param.deadline = deadline_;
sched_param.estimated_initial_execution_time = 0;
sched_param_policy = scheduler_->create_scheduling_parameter (sched_param);
CORBA::Policy_ptr implicit_sched_param = 0;
ACE_DEBUG ((LM_DEBUG, "(%t|%T):before begin_sched_segment\n"));
scheduler_current_->begin_scheduling_segment (name,
sched_param_policy.in (),
implicit_sched_param
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_DEBUG ((LM_DEBUG, "(%t|%T):after begin_sched_segment\n"));
}
ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to make two way call\n"));
server_->test_method (server_load_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_DEBUG ((LM_DEBUG, "(%t|%T):two way call done\n"));
if (enable_dynamic_scheduling)
{
scheduler_current_->end_scheduling_segment (name);
ACE_CHECK_RETURN (-1);
}
ACE_DEBUG ((LM_DEBUG, "client worker thread (%t) done\n"));
return 0;
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?