thread_pool.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 938 行 · 第 1/2 页
CPP
938 行
this->lanes_[i]->open (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
TAO_Thread_Pool::~TAO_Thread_Pool (void)
{
// Delete all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
delete this->lanes_[i];
delete[] this->lanes_;
}
void
TAO_Thread_Pool::finalize (void)
{
// Finalize all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
this->lanes_[i]->finalize ();
}
void
TAO_Thread_Pool::shutdown_reactor (void)
{
// Finalize all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
this->lanes_[i]->shutdown_reactor ();
}
void
TAO_Thread_Pool::wait (void)
{
// Finalize all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
this->lanes_[i]->wait ();
}
int
TAO_Thread_Pool::is_collocated (const TAO_MProfile &mprofile)
{
// Finalize all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
{
int result =
this->lanes_[i]->is_collocated (mprofile);
if (result)
return result;
}
return 0;
}
int
TAO_Thread_Pool::create_static_threads (void)
{
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
{
// Ask each lane to create its set of static threads.
int result = this->lanes_[i]->create_static_threads ();
// Return on failure.
if (result != 0)
return result;
}
// Success.
return 0;
}
int
TAO_Thread_Pool::with_lanes (void) const
{
return this->with_lanes_;
}
TAO_Thread_Pool_Manager &
TAO_Thread_Pool::manager (void) const
{
return this->manager_;
}
CORBA::ULong
TAO_Thread_Pool::id (void) const
{
return this->id_;
}
CORBA::ULong
TAO_Thread_Pool::stack_size (void) const
{
return this->stack_size_;
}
CORBA::Boolean
TAO_Thread_Pool::allow_borrowing (void) const
{
return this->allow_borrowing_;
}
CORBA::Boolean
TAO_Thread_Pool::allow_request_buffering (void) const
{
return this->allow_request_buffering_;
}
CORBA::ULong
TAO_Thread_Pool::max_buffered_requests (void) const
{
return this->max_buffered_requests_;
}
CORBA::ULong
TAO_Thread_Pool::max_request_buffer_size (void) const
{
return this->max_request_buffer_size_;
}
TAO_Thread_Lane **
TAO_Thread_Pool::lanes (void)
{
return this->lanes_;
}
CORBA::ULong
TAO_Thread_Pool::number_of_lanes (void) const
{
return this->number_of_lanes_;
}
#define TAO_THREAD_POOL_MANAGER_GUARD \
ACE_GUARD_THROW_EX ( \
ACE_SYNCH_MUTEX, \
mon, \
this->lock_, \
CORBA::INTERNAL ( \
CORBA::SystemException::_tao_minor_code ( \
TAO_GUARD_FAILURE, \
0), \
CORBA::COMPLETED_NO));
TAO_Thread_Pool_Manager::TAO_Thread_Pool_Manager (TAO_ORB_Core &orb_core)
: orb_core_ (orb_core),
thread_pools_ (),
thread_pool_id_counter_ (1),
lock_ ()
{
}
TAO_Thread_Pool_Manager::~TAO_Thread_Pool_Manager (void)
{
// Delete all the pools.
for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
iterator != this->thread_pools_.end ();
++iterator)
delete (*iterator).int_id_;
}
void
TAO_Thread_Pool_Manager::finalize (void)
{
// Finalize all the pools.
for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
iterator != this->thread_pools_.end ();
++iterator)
(*iterator).int_id_->finalize ();
}
void
TAO_Thread_Pool_Manager::shutdown_reactor (void)
{
// Finalize all the pools.
for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
iterator != this->thread_pools_.end ();
++iterator)
(*iterator).int_id_->shutdown_reactor ();
}
void
TAO_Thread_Pool_Manager::wait (void)
{
// Finalize all the pools.
for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
iterator != this->thread_pools_.end ();
++iterator)
(*iterator).int_id_->wait ();
}
int
TAO_Thread_Pool_Manager::is_collocated (const TAO_MProfile &mprofile)
{
// Finalize all the pools.
for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
iterator != this->thread_pools_.end ();
++iterator)
{
int result =
(*iterator).int_id_->is_collocated (mprofile);
if (result)
return result;
}
return 0;
}
RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize,
CORBA::ULong static_threads,
CORBA::ULong dynamic_threads,
RTCORBA::Priority default_priority,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
TAO_THREAD_POOL_MANAGER_GUARD;
ACE_CHECK_RETURN (0);
return this->create_threadpool_i (stacksize,
static_threads,
dynamic_threads,
default_priority,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size
ACE_ENV_ARG_PARAMETER);
}
RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_with_lanes (CORBA::ULong stacksize,
const RTCORBA::ThreadpoolLanes & lanes,
CORBA::Boolean allow_borrowing,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
TAO_THREAD_POOL_MANAGER_GUARD;
ACE_CHECK_RETURN (0);
return this->create_threadpool_with_lanes_i (stacksize,
lanes,
allow_borrowing,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size
ACE_ENV_ARG_PARAMETER);
}
void
TAO_Thread_Pool_Manager::destroy_threadpool (RTCORBA::ThreadpoolId threadpool
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTCORBA::RTORB::InvalidThreadpool))
{
TAO_THREAD_POOL_MANAGER_GUARD;
ACE_CHECK;
this->destroy_threadpool_i (threadpool
ACE_ENV_ARG_PARAMETER);
}
RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize,
CORBA::ULong static_threads,
CORBA::ULong dynamic_threads,
RTCORBA::Priority default_priority,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Create the thread pool.
TAO_Thread_Pool *thread_pool = 0;
ACE_NEW_THROW_EX (thread_pool,
TAO_Thread_Pool (*this,
this->thread_pool_id_counter_,
stacksize,
static_threads,
dynamic_threads,
default_priority,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size
ACE_ENV_ARG_PARAMETER),
CORBA::NO_MEMORY ());
ACE_CHECK_RETURN (0);
return this->create_threadpool_helper (thread_pool
ACE_ENV_ARG_PARAMETER);
}
RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize,
const RTCORBA::ThreadpoolLanes &lanes,
CORBA::Boolean allow_borrowing,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Create the thread pool.
TAO_Thread_Pool *thread_pool = 0;
ACE_NEW_THROW_EX (thread_pool,
TAO_Thread_Pool (*this,
this->thread_pool_id_counter_,
stacksize,
lanes,
allow_borrowing,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size
ACE_ENV_ARG_PARAMETER),
CORBA::NO_MEMORY ());
ACE_CHECK_RETURN (0);
return this->create_threadpool_helper (thread_pool
ACE_ENV_ARG_PARAMETER);
}
RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_helper (TAO_Thread_Pool *thread_pool
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Make sure of safe deletion in case of errors.
auto_ptr<TAO_Thread_Pool> safe_thread_pool (thread_pool);
// Open the pool.
thread_pool->open (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
// Create the static threads.
int result =
thread_pool->create_static_threads ();
// Throw exception in case of errors.
if (result != 0)
{
// Finalize thread pool related resources.
thread_pool->finalize ();
ACE_THROW_RETURN (
CORBA::INTERNAL (
CORBA::SystemException::_tao_minor_code (
TAO_RTCORBA_THREAD_CREATION_LOCATION_CODE,
errno),
CORBA::COMPLETED_NO),
result);
}
// Bind thread to internal table.
result =
this->thread_pools_.bind (this->thread_pool_id_counter_,
thread_pool);
// Throw exceptin in case of errors.
if (result != 0)
ACE_THROW_RETURN (CORBA::INTERNAL (),
result);
//
// Success.
//
// No need to delete thread pool.
safe_thread_pool.release ();
// Return current counter and perform post-increment.
return this->thread_pool_id_counter_++;
}
void
TAO_Thread_Pool_Manager::destroy_threadpool_i (RTCORBA::ThreadpoolId thread_pool_id
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTCORBA::RTORB::InvalidThreadpool))
{
TAO_Thread_Pool *thread_pool = 0;
// Unbind the thread pool from the map.
int result =
this->thread_pools_.unbind (thread_pool_id,
thread_pool);
// If the thread pool is not found in our map.
if (result != 0)
ACE_THROW (RTCORBA::RTORB::InvalidThreadpool ());
// Shutdown reactor.
thread_pool->shutdown_reactor ();
// Wait for the threads.
thread_pool->wait ();
// Finalize resources.
thread_pool->finalize ();
// Delete the thread pool.
delete thread_pool;
}
TAO_ORB_Core &
TAO_Thread_Pool_Manager::orb_core (void) const
{
return this->orb_core_;
}
ACE_SYNCH_MUTEX &
TAO_Thread_Pool_Manager::lock (void)
{
return this->lock_;
}
TAO_Thread_Pool_Manager::THREAD_POOLS &
TAO_Thread_Pool_Manager::thread_pools (void)
{
return this->thread_pools_;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Hash_Map_Manager<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Entry<RTCORBA::ThreadpoolId, TAO_Thread_Pool *>;
template class ACE_Hash_Map_Iterator_Base_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>;
template class ACE_Auto_Basic_Ptr<TAO_Thread_Pool>;
template class ACE_Auto_Basic_Array_Ptr<size_t>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Hash_Map_Manager<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Entry<RTCORBA::ThreadpoolId, TAO_Thread_Pool *>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>
#pragma instantiate ACE_Auto_Basic_Ptr<TAO_Thread_Pool>
#pragma instantiate ACE_Auto_Basic_Array_Ptr<size_t>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?