thread_pool.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 938 行 · 第 1/2 页

CPP
938
字号
      this->lanes_[i]->open (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
    }
}

TAO_Thread_Pool::~TAO_Thread_Pool (void)
{
  // Delete all the lanes.
  for (CORBA::ULong i = 0;
       i != this->number_of_lanes_;
       ++i)
    delete this->lanes_[i];

  delete[] this->lanes_;
}

void
TAO_Thread_Pool::finalize (void)
{
  // Finalize all the lanes.
  for (CORBA::ULong i = 0;
       i != this->number_of_lanes_;
       ++i)
    this->lanes_[i]->finalize ();
}

void
TAO_Thread_Pool::shutdown_reactor (void)
{
  // Finalize all the lanes.
  for (CORBA::ULong i = 0;
       i != this->number_of_lanes_;
       ++i)
    this->lanes_[i]->shutdown_reactor ();
}

void
TAO_Thread_Pool::wait (void)
{
  // Finalize all the lanes.
  for (CORBA::ULong i = 0;
       i != this->number_of_lanes_;
       ++i)
    this->lanes_[i]->wait ();
}

int
TAO_Thread_Pool::is_collocated (const TAO_MProfile &mprofile)
{
  // Finalize all the lanes.
  for (CORBA::ULong i = 0;
       i != this->number_of_lanes_;
       ++i)
    {
      int result =
        this->lanes_[i]->is_collocated (mprofile);

      if (result)
        return result;
    }

  return 0;
}

int
TAO_Thread_Pool::create_static_threads (void)
{
  for (CORBA::ULong i = 0;
       i != this->number_of_lanes_;
       ++i)
    {
      // Ask each lane to create its set of static threads.
      int result = this->lanes_[i]->create_static_threads ();

      // Return on failure.
      if (result != 0)
        return result;
    }

  // Success.
  return 0;
}

int
TAO_Thread_Pool::with_lanes (void) const
{
  return this->with_lanes_;
}

TAO_Thread_Pool_Manager &
TAO_Thread_Pool::manager (void) const
{
  return this->manager_;
}

CORBA::ULong
TAO_Thread_Pool::id (void) const
{
  return this->id_;
}

CORBA::ULong
TAO_Thread_Pool::stack_size (void) const
{
  return this->stack_size_;
}

CORBA::Boolean
TAO_Thread_Pool::allow_borrowing (void) const
{
  return this->allow_borrowing_;
}

CORBA::Boolean
TAO_Thread_Pool::allow_request_buffering (void) const
{
  return this->allow_request_buffering_;
}

CORBA::ULong
TAO_Thread_Pool::max_buffered_requests (void) const
{
  return this->max_buffered_requests_;
}

CORBA::ULong
TAO_Thread_Pool::max_request_buffer_size (void) const
{
  return this->max_request_buffer_size_;
}

TAO_Thread_Lane **
TAO_Thread_Pool::lanes (void)
{
  return this->lanes_;
}

CORBA::ULong
TAO_Thread_Pool::number_of_lanes (void) const
{
  return this->number_of_lanes_;
}

#define TAO_THREAD_POOL_MANAGER_GUARD \
  ACE_GUARD_THROW_EX ( \
    ACE_SYNCH_MUTEX, \
    mon, \
    this->lock_, \
    CORBA::INTERNAL ( \
      CORBA::SystemException::_tao_minor_code ( \
        TAO_GUARD_FAILURE, \
        0), \
      CORBA::COMPLETED_NO));

TAO_Thread_Pool_Manager::TAO_Thread_Pool_Manager (TAO_ORB_Core &orb_core)
  : orb_core_ (orb_core),
    thread_pools_ (),
    thread_pool_id_counter_ (1),
    lock_ ()
{
}

TAO_Thread_Pool_Manager::~TAO_Thread_Pool_Manager (void)
{
  // Delete all the pools.
  for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
       iterator != this->thread_pools_.end ();
       ++iterator)
    delete (*iterator).int_id_;
}

void
TAO_Thread_Pool_Manager::finalize (void)
{
  // Finalize all the pools.
  for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
       iterator != this->thread_pools_.end ();
       ++iterator)
    (*iterator).int_id_->finalize ();
}

void
TAO_Thread_Pool_Manager::shutdown_reactor (void)
{
  // Finalize all the pools.
  for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
       iterator != this->thread_pools_.end ();
       ++iterator)
    (*iterator).int_id_->shutdown_reactor ();
}

void
TAO_Thread_Pool_Manager::wait (void)
{
  // Finalize all the pools.
  for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
       iterator != this->thread_pools_.end ();
       ++iterator)
    (*iterator).int_id_->wait ();
}

int
TAO_Thread_Pool_Manager::is_collocated (const TAO_MProfile &mprofile)
{
  // Finalize all the pools.
  for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
       iterator != this->thread_pools_.end ();
       ++iterator)
    {
      int result =
        (*iterator).int_id_->is_collocated (mprofile);

      if (result)
        return result;
    }

  return 0;
}

RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize,
                                            CORBA::ULong static_threads,
                                            CORBA::ULong dynamic_threads,
                                            RTCORBA::Priority default_priority,
                                            CORBA::Boolean allow_request_buffering,
                                            CORBA::ULong max_buffered_requests,
                                            CORBA::ULong max_request_buffer_size
                                            ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  TAO_THREAD_POOL_MANAGER_GUARD;
  ACE_CHECK_RETURN (0);

  return this->create_threadpool_i (stacksize,
                                    static_threads,
                                    dynamic_threads,
                                    default_priority,
                                    allow_request_buffering,
                                    max_buffered_requests,
                                    max_request_buffer_size
                                    ACE_ENV_ARG_PARAMETER);
}

RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_with_lanes (CORBA::ULong stacksize,
                                                       const RTCORBA::ThreadpoolLanes & lanes,
                                                       CORBA::Boolean allow_borrowing,
                                                       CORBA::Boolean allow_request_buffering,
                                                       CORBA::ULong max_buffered_requests,
                                                       CORBA::ULong max_request_buffer_size
                                                       ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  TAO_THREAD_POOL_MANAGER_GUARD;
  ACE_CHECK_RETURN (0);

  return this->create_threadpool_with_lanes_i (stacksize,
                                               lanes,
                                               allow_borrowing,
                                               allow_request_buffering,
                                               max_buffered_requests,
                                               max_request_buffer_size
                                               ACE_ENV_ARG_PARAMETER);
}

void
TAO_Thread_Pool_Manager::destroy_threadpool (RTCORBA::ThreadpoolId threadpool
                                             ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   RTCORBA::RTORB::InvalidThreadpool))
{
  TAO_THREAD_POOL_MANAGER_GUARD;
  ACE_CHECK;

  this->destroy_threadpool_i (threadpool
                              ACE_ENV_ARG_PARAMETER);
}

RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize,
                                              CORBA::ULong static_threads,
                                              CORBA::ULong dynamic_threads,
                                              RTCORBA::Priority default_priority,
                                              CORBA::Boolean allow_request_buffering,
                                              CORBA::ULong max_buffered_requests,
                                              CORBA::ULong max_request_buffer_size
                                              ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Create the thread pool.
  TAO_Thread_Pool *thread_pool = 0;

  ACE_NEW_THROW_EX (thread_pool,
                    TAO_Thread_Pool (*this,
                                     this->thread_pool_id_counter_,
                                     stacksize,
                                     static_threads,
                                     dynamic_threads,
                                     default_priority,
                                     allow_request_buffering,
                                     max_buffered_requests,
                                     max_request_buffer_size
                                     ACE_ENV_ARG_PARAMETER),
                    CORBA::NO_MEMORY ());
  ACE_CHECK_RETURN (0);

  return this->create_threadpool_helper (thread_pool
                                         ACE_ENV_ARG_PARAMETER);
}

RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize,
                                                         const RTCORBA::ThreadpoolLanes &lanes,
                                                         CORBA::Boolean allow_borrowing,
                                                         CORBA::Boolean allow_request_buffering,
                                                         CORBA::ULong max_buffered_requests,
                                                         CORBA::ULong max_request_buffer_size
                                                         ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Create the thread pool.
  TAO_Thread_Pool *thread_pool = 0;

  ACE_NEW_THROW_EX (thread_pool,
                    TAO_Thread_Pool (*this,
                                     this->thread_pool_id_counter_,
                                     stacksize,
                                     lanes,
                                     allow_borrowing,
                                     allow_request_buffering,
                                     max_buffered_requests,
                                     max_request_buffer_size
                                     ACE_ENV_ARG_PARAMETER),
                    CORBA::NO_MEMORY ());
  ACE_CHECK_RETURN (0);

  return this->create_threadpool_helper (thread_pool
                                         ACE_ENV_ARG_PARAMETER);
}

RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool_helper (TAO_Thread_Pool *thread_pool
                                                   ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Make sure of safe deletion in case of errors.
  auto_ptr<TAO_Thread_Pool> safe_thread_pool (thread_pool);

  // Open the pool.
  thread_pool->open (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (0);

  // Create the static threads.
  int result =
    thread_pool->create_static_threads ();

  // Throw exception in case of errors.
  if (result != 0)
    {
      // Finalize thread pool related resources.
      thread_pool->finalize ();

      ACE_THROW_RETURN (
        CORBA::INTERNAL (
          CORBA::SystemException::_tao_minor_code (
            TAO_RTCORBA_THREAD_CREATION_LOCATION_CODE,
            errno),
          CORBA::COMPLETED_NO),
        result);
    }

  // Bind thread to internal table.
  result =
    this->thread_pools_.bind (this->thread_pool_id_counter_,
                              thread_pool);

  // Throw exceptin in case of errors.
  if (result != 0)
    ACE_THROW_RETURN (CORBA::INTERNAL (),
                      result);

  //
  // Success.
  //

  // No need to delete thread pool.
  safe_thread_pool.release ();

  // Return current counter and perform post-increment.
  return this->thread_pool_id_counter_++;
}

void
TAO_Thread_Pool_Manager::destroy_threadpool_i (RTCORBA::ThreadpoolId thread_pool_id
                                               ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   RTCORBA::RTORB::InvalidThreadpool))
{
  TAO_Thread_Pool *thread_pool = 0;

  // Unbind the thread pool from the map.
  int result =
    this->thread_pools_.unbind (thread_pool_id,
                                thread_pool);

  // If the thread pool is not found in our map.
  if (result != 0)
    ACE_THROW (RTCORBA::RTORB::InvalidThreadpool ());

  // Shutdown reactor.
  thread_pool->shutdown_reactor ();

  // Wait for the threads.
  thread_pool->wait ();

  // Finalize resources.
  thread_pool->finalize ();

  // Delete the thread pool.
  delete thread_pool;
}

TAO_ORB_Core &
TAO_Thread_Pool_Manager::orb_core (void) const
{
  return this->orb_core_;
}

ACE_SYNCH_MUTEX &
TAO_Thread_Pool_Manager::lock (void)
{
  return this->lock_;
}

TAO_Thread_Pool_Manager::THREAD_POOLS &
TAO_Thread_Pool_Manager::thread_pools (void)
{
  return this->thread_pools_;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_Hash_Map_Manager<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Entry<RTCORBA::ThreadpoolId, TAO_Thread_Pool *>;
template class ACE_Hash_Map_Iterator_Base_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>;

template class ACE_Auto_Basic_Ptr<TAO_Thread_Pool>;
template class ACE_Auto_Basic_Array_Ptr<size_t>;

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_Hash_Map_Manager<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Entry<RTCORBA::ThreadpoolId, TAO_Thread_Pool *>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<RTCORBA::ThreadpoolId, TAO_Thread_Pool *, ACE_Hash<RTCORBA::ThreadpoolId>, ACE_Equal_To<RTCORBA::ThreadpoolId>, ACE_Null_Mutex>

#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<unsigned int, TAO_Thread_Pool *, ACE_Hash<unsigned int>, ACE_Equal_To<unsigned int>, ACE_Null_Mutex>

#pragma instantiate ACE_Auto_Basic_Ptr<TAO_Thread_Pool>
#pragma instantiate ACE_Auto_Basic_Array_Ptr<size_t>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?