thread_pool.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 938 行 · 第 1/2 页
CPP
938 行
#include "Thread_Pool.h"
#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
ACE_RCSID (RTCORBA,
Thread_Pool,
"Thread_Pool.cpp,v 1.13 2003/10/28 18:29:32 bala Exp")
#include "tao/Exception.h"
#include "tao/ORB_Core.h"
#include "tao/ORB.h"
#include "tao/Acceptor_Registry.h"
#include "tao/Transport_Cache_Manager.h"
#include "tao/debug.h"
#include "tao/RTCORBA/Priority_Mapping_Manager.h"
#include "tao/LF_Follower.h"
#include "tao/Leader_Follower.h"
#include "ace/Auto_Ptr.h"
#if !defined (__ACE_INLINE__)
# include "Thread_Pool.i"
#endif /* ! __ACE_INLINE__ */
TAO_RT_New_Leader_Generator::TAO_RT_New_Leader_Generator (
TAO_Thread_Lane &lane)
: lane_ (lane)
{
}
void
TAO_RT_New_Leader_Generator::no_leaders_available (void)
{
// Note that we are checking this condition below without the lock
// held. The value of <static_threads> and <dynamic_threads> does
// not change, but <current_threads> increases when new dynamic
// threads are created. Even if we catch <current_threads> in an
// inconsistent state, we will double check later with the lock
// held. Therefore, this check should not cause any big problems.
if (this->lane_.current_threads () ==
this->lane_.static_threads () +
this->lane_.dynamic_threads ())
return;
TAO_Thread_Pool_Manager &manager =
this->lane_.pool ().manager ();
ACE_GUARD (ACE_SYNCH_MUTEX,
mon,
manager.lock ());
if (this->lane_.current_threads () <
(this->lane_.static_threads () +
this->lane_.dynamic_threads ()) &&
!manager.orb_core ().has_shutdown ())
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n")
ACE_TEXT ("Current number of threads = %d; ")
ACE_TEXT ("static threads = %d; dynamic threads = %d\n")
ACE_TEXT ("No leaders available; creating new leader!\n"),
this->lane_.pool ().id (),
this->lane_.id (),
this->lane_.current_threads (),
this->lane_.static_threads (),
this->lane_.dynamic_threads ()));
int result =
this->lane_.create_dynamic_threads (1);
if (result != 0)
ACE_ERROR ((LM_ERROR,
"Pool %d Lane %d Thread %t: cannot create dynamic thread\n",
this->lane_.pool ().id (),
this->lane_.id ()));
}
}
TAO_Thread_Pool_Threads::TAO_Thread_Pool_Threads (TAO_Thread_Lane &lane)
: ACE_Task_Base (lane.pool ().manager ().orb_core ().thr_mgr ()),
lane_ (lane)
{
}
TAO_Thread_Lane &
TAO_Thread_Pool_Threads::lane (void) const
{
return this->lane_;
}
int
TAO_Thread_Pool_Threads::svc (void)
{
TAO_ORB_Core &orb_core =
this->lane ().pool ().manager ().orb_core ();
if (orb_core.has_shutdown ())
return 0;
// Set TSS resources for this thread.
TAO_Thread_Pool_Threads::set_tss_resources (orb_core,
this->lane_);
CORBA::ORB_ptr orb =
orb_core.orb ();
ACE_TRY_NEW_ENV
{
// Run the ORB.
orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
// No point propagating this exception. Print it out.
ACE_ERROR ((LM_ERROR,
"orb->run() raised exception for thread %t\n"));
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"");
}
ACE_ENDTRY;
return 0;
}
void
TAO_Thread_Pool_Threads::set_tss_resources (TAO_ORB_Core &orb_core,
TAO_Thread_Lane &thread_lane)
{
/// Get the ORB_Core's TSS resources.
TAO_ORB_Core_TSS_Resources &tss =
*orb_core.get_tss_resources ();
/// Set the lane attribute in TSS.
tss.lane_ = &thread_lane;
}
TAO_Thread_Lane::TAO_Thread_Lane (TAO_Thread_Pool &pool,
CORBA::ULong id,
CORBA::Short lane_priority,
CORBA::ULong static_threads,
CORBA::ULong dynamic_threads
ACE_ENV_ARG_DECL_NOT_USED)
: pool_ (pool),
id_ (id),
lane_priority_ (lane_priority),
static_threads_ (static_threads),
dynamic_threads_ (dynamic_threads),
current_threads_ (0),
threads_ (*this),
new_thread_generator_ (*this),
resources_ (pool.manager ().orb_core (),
&new_thread_generator_),
native_priority_ (TAO_INVALID_PRIORITY)
{
}
void
TAO_Thread_Lane::validate_and_map_priority (ACE_ENV_SINGLE_ARG_DECL)
{
// Make sure that <static_threads_> is not zero.
if (this->static_threads_ == 0)
ACE_THROW (CORBA::BAD_PARAM ());
// Check that the priority is in bounds.
if (this->lane_priority_ < RTCORBA::minPriority
// The line below will always be false unless the value of
// RTCORBA::maxPriority, which is now assigned the value of
// 32767, is changed in RTCORBA.pidl.
// || this->lane_priority_ > RTCORBA::maxPriority
)
{
ACE_THROW (CORBA::BAD_PARAM ());
}
CORBA::ORB_ptr orb =
this->pool_.manager ().orb_core ().orb ();
// Get the priority mapping manager.
CORBA::Object_var obj =
orb->resolve_initial_references (TAO_OBJID_PRIORITYMAPPINGMANAGER
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
TAO_Priority_Mapping_Manager_var mapping_manager =
TAO_Priority_Mapping_Manager::_narrow (obj.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
RTCORBA::PriorityMapping *pm =
mapping_manager.in ()->mapping ();
// Map CORBA priority to native priority.
CORBA::Boolean result =
pm->to_native (this->lane_priority_,
this->native_priority_);
if (!result)
ACE_THROW (CORBA::DATA_CONVERSION ());
if (TAO_debug_level > 3)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - creating thread at ")
ACE_TEXT ("(corba:native) priority %d:%d\n"),
this->lane_priority_,
this->native_priority_));
}
void
TAO_Thread_Lane::open (ACE_ENV_SINGLE_ARG_DECL)
{
// Validate and map priority.
this->validate_and_map_priority (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// Open the acceptor registry.
int result = 0;
result =
this->resources_.open_acceptor_registry (1
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (result == -1)
ACE_THROW (CORBA::INTERNAL (
CORBA::SystemException::_tao_minor_code (
TAO_ACCEPTOR_REGISTRY_OPEN_LOCATION_CODE,
0),
CORBA::COMPLETED_NO));
}
TAO_Thread_Lane::~TAO_Thread_Lane (void)
{
}
void
TAO_Thread_Lane::finalize (void)
{
// Finalize resources.
this->resources_.finalize ();
}
void
TAO_Thread_Lane::shutdown_reactor (void)
{
this->resources_.shutdown_reactor ();
}
void
TAO_Thread_Lane::wait (void)
{
this->threads_.wait ();
}
int
TAO_Thread_Lane::is_collocated (const TAO_MProfile &mprofile)
{
return this->resources_.is_collocated (mprofile);
}
int
TAO_Thread_Lane::create_static_threads (void)
{
// Create static threads.
return this->create_dynamic_threads (this->static_threads_);
}
int
TAO_Thread_Lane::create_dynamic_threads (CORBA::ULong number_of_threads)
{
// Overwritten parameters.
int force_active = 1;
// Default parameters.
long default_flags = THR_NEW_LWP | THR_JOINABLE;
int default_grp_id = -1;
ACE_Task_Base *default_task = 0;
ACE_hthread_t *default_thread_handles = 0;
void **default_stack = 0;
// Setting stack size.
size_t *stack_size_array = 0;
ACE_NEW_RETURN (stack_size_array,
size_t[number_of_threads],
-1);
size_t index;
for (index = 0;
index != number_of_threads;
++index)
stack_size_array[index] =
this->pool ().stack_size_;
// Make sure the dynamically created stack size array is properly
// deleted.
ACE_Auto_Basic_Array_Ptr<size_t> auto_stack_size_array (stack_size_array);
TAO_ORB_Core &orb_core =
this->pool ().manager ().orb_core ();
long flags =
default_flags |
orb_core.orb_params ()->thread_creation_flags ();
// Activate the threads.
int result =
this->threads_.activate (flags,
number_of_threads,
force_active,
this->native_priority_,
default_grp_id,
default_task,
default_thread_handles,
default_stack,
stack_size_array);
if (result != 0)
return result;
this->current_threads_ +=
number_of_threads;
return result;
}
CORBA::ULong
TAO_Thread_Lane::id (void) const
{
return this->id_;
}
TAO_Thread_Pool &
TAO_Thread_Lane::pool (void) const
{
return this->pool_;
}
CORBA::Short
TAO_Thread_Lane::lane_priority (void) const
{
return this->lane_priority_;
}
CORBA::Short
TAO_Thread_Lane::native_priority (void) const
{
return this->native_priority_;
}
CORBA::ULong
TAO_Thread_Lane::static_threads (void) const
{
return this->static_threads_;
}
CORBA::ULong
TAO_Thread_Lane::dynamic_threads (void) const
{
return this->dynamic_threads_;
}
CORBA::ULong
TAO_Thread_Lane::current_threads (void) const
{
return this->current_threads_;
}
void
TAO_Thread_Lane::current_threads (CORBA::ULong current_threads)
{
this->current_threads_ = current_threads;
}
TAO_Thread_Pool_Threads &
TAO_Thread_Lane::threads (void)
{
return this->threads_;
}
TAO_Thread_Lane_Resources &
TAO_Thread_Lane::resources (void)
{
return this->resources_;
}
TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
CORBA::ULong id,
CORBA::ULong stack_size,
CORBA::ULong static_threads,
CORBA::ULong dynamic_threads,
CORBA::Short default_priority,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size
ACE_ENV_ARG_DECL)
: manager_ (manager),
id_ (id),
stack_size_ (stack_size),
allow_borrowing_ (0),
allow_request_buffering_ (allow_request_buffering),
max_buffered_requests_ (max_buffered_requests),
max_request_buffer_size_ (max_request_buffer_size),
lanes_ (0),
number_of_lanes_ (1),
with_lanes_ (0)
{
// No support for buffering.
if (allow_request_buffering)
ACE_THROW (CORBA::NO_IMPLEMENT ());
// Create one lane.
this->lanes_ = new TAO_Thread_Lane *[this->number_of_lanes_];
this->lanes_[0] =
new TAO_Thread_Lane (*this,
0,
default_priority,
static_threads,
dynamic_threads
ACE_ENV_ARG_PARAMETER);
}
TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
CORBA::ULong id,
CORBA::ULong stack_size,
const RTCORBA::ThreadpoolLanes &lanes,
CORBA::Boolean allow_borrowing,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size
ACE_ENV_ARG_DECL)
: manager_ (manager),
id_ (id),
stack_size_ (stack_size),
allow_borrowing_ (allow_borrowing),
allow_request_buffering_ (allow_request_buffering),
max_buffered_requests_ (max_buffered_requests),
max_request_buffer_size_ (max_request_buffer_size),
lanes_ (0),
number_of_lanes_ (lanes.length ()),
with_lanes_ (1)
{
// No support for buffering or borrowing.
if (allow_borrowing ||
allow_request_buffering)
ACE_THROW (CORBA::NO_IMPLEMENT ());
// Create multiple lane.
this->lanes_ = new TAO_Thread_Lane *[this->number_of_lanes_];
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
this->lanes_[i] =
new TAO_Thread_Lane (*this,
i,
lanes[i].lane_priority,
lanes[i].static_threads,
lanes[i].dynamic_threads
ACE_ENV_ARG_PARAMETER);
}
void
TAO_Thread_Pool::open (ACE_ENV_SINGLE_ARG_DECL)
{
// Open all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
{
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?