⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tlibthrd.cxx

📁 opal的ptlib c++源程序 可以从官方网站上下载
💻 CXX
📖 第 1 页 / 共 3 页
字号:

#ifdef P_MACOSX
// obtain thread priority of the main thread
static unsigned long
GetThreadBasePriority ()
{
    thread_basic_info_data_t threadInfo;
    policy_info_data_t       thePolicyInfo;
    unsigned int             count;

    if (baseThread == 0) {
      return 0;
    }

    // get basic info
    count = THREAD_BASIC_INFO_COUNT;
    thread_info (pthread_mach_thread_np (baseThread), THREAD_BASIC_INFO,
                 (integer_t*)&threadInfo, &count);

    switch (threadInfo.policy) {
    case POLICY_TIMESHARE:
      count = POLICY_TIMESHARE_INFO_COUNT;
      thread_info(pthread_mach_thread_np (baseThread),
                  THREAD_SCHED_TIMESHARE_INFO,
                  (integer_t*)&(thePolicyInfo.ts), &count);
      return thePolicyInfo.ts.base_priority;

    case POLICY_FIFO:
      count = POLICY_FIFO_INFO_COUNT;
      thread_info(pthread_mach_thread_np (baseThread),
                  THREAD_SCHED_FIFO_INFO,
                  (integer_t*)&(thePolicyInfo.fifo), &count);
      if (thePolicyInfo.fifo.depressed) 
        return thePolicyInfo.fifo.depress_priority;
      return thePolicyInfo.fifo.base_priority;

    case POLICY_RR:
      count = POLICY_RR_INFO_COUNT;
      thread_info(pthread_mach_thread_np (baseThread),
                  THREAD_SCHED_RR_INFO,
                  (integer_t*)&(thePolicyInfo.rr), &count);
      if (thePolicyInfo.rr.depressed) 
        return thePolicyInfo.rr.depress_priority;
      return thePolicyInfo.rr.base_priority;
    }

    return 0;
}
#endif

void PThread::SetPriority(Priority priorityLevel)
{
  PX_priority = priorityLevel;

#if defined(P_LINUX)
  if (IsTerminated())
    return;

  struct sched_param sched_param;
  
  if ((priorityLevel == HighestPriority) && (geteuid() == 0) ) {
    sched_param.sched_priority = sched_get_priority_min( SCHED_FIFO );
    
    PAssertPTHREAD(pthread_setschedparam, (PX_threadId, SCHED_FIFO, &sched_param));
  }
  else if (priorityLevel != HighestPriority) {
    /* priority 0 is the only permitted value for the SCHED_OTHER scheduler */ 
    sched_param.sched_priority = 0;
    
    PAssertPTHREAD(pthread_setschedparam, (PX_threadId, SCHED_OTHER, &sched_param));
  }
#endif

#if defined(P_MACOSX)
  if (IsTerminated())
    return;

  if (priorityLevel == HighestPriority) {
    /* get fixed priority */
    {
      int result;

      thread_extended_policy_data_t   theFixedPolicy;
      thread_precedence_policy_data_t thePrecedencePolicy;
      long                            relativePriority;

      theFixedPolicy.timeshare = false; // set to true for a non-fixed thread
      result = thread_policy_set (pthread_mach_thread_np(PX_threadId),
                                  THREAD_EXTENDED_POLICY,
                                  (thread_policy_t)&theFixedPolicy,
                                  THREAD_EXTENDED_POLICY_COUNT);
      if (result != KERN_SUCCESS) {
        PTRACE(1, "thread_policy - Couldn't set thread as fixed priority.");
      }

      // set priority

      // precedency policy's "importance" value is relative to
      // spawning thread's priority
      
      relativePriority = 62 - GetThreadBasePriority();
      PTRACE(1,  "relativePriority is " << relativePriority << " base priority is " << GetThreadBasePriority());
      
      thePrecedencePolicy.importance = relativePriority;
      result = thread_policy_set (pthread_mach_thread_np(PX_threadId),
                                  THREAD_PRECEDENCE_POLICY,
                                  (thread_policy_t)&thePrecedencePolicy, 
                                  THREAD_PRECEDENCE_POLICY_COUNT);
      if (result != KERN_SUCCESS) {
        PTRACE(1, "thread_policy - Couldn't set thread priority.");
      }
    }
  }
#endif
}


PThread::Priority PThread::GetPriority() const
{
#if defined(LINUX)
  int schedulingPolicy;
  struct sched_param schedParams;
  
  PAssertPTHREAD(pthread_getschedparam, (PX_threadId, &schedulingPolicy, &schedParams));
  
  switch( schedulingPolicy )
  {
    case SCHED_OTHER:
      break;
      
    case SCHED_FIFO:
    case SCHED_RR:
      return HighestPriority;
      
    default:
      /* Unknown scheduler. We don't know what priority this thread has. */
      PTRACE(1, "PWLib\tPThread::GetPriority: unknown scheduling policy #" << schedulingPolicy);
  }
#endif

  return NormalPriority; /* as good a guess as any */
}


#ifndef P_HAS_SEMAPHORES
void PThread::PXSetWaitingSemaphore(PSemaphore * sem)
{
  PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex));
  PX_waitingSemaphore = sem;
  PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));
}
#endif


#ifdef P_GNU_PTH
// GNU PTH threads version (used by NetBSD)
// Taken from NetBSD pkg patches
void PThread::Sleep(const PTimeInterval & timeout)
{
  PTime lastTime;
  PTime targetTime = PTime() + timeout;

  sched_yield();
  lastTime = PTime();

  while (lastTime < targetTime) {
    P_timeval tval = targetTime - lastTime;
    if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR)
      break;

    pthread_testcancel();

    lastTime = PTime();
  }
}

#else
// Normal Posix threads version
void PThread::Sleep(const PTimeInterval & timeout)
{
  PTime lastTime;
  PTime targetTime = lastTime + timeout;
  do {
    P_timeval tval = targetTime - lastTime;
    if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR)
      break;

#if !( defined(P_NETBSD) && defined(P_NO_CANCEL) )
    pthread_testcancel();
#endif

    lastTime = PTime();
  } while (lastTime < targetTime);
}
#endif

void PThread::Yield()
{
  sched_yield();
}


PThread * PThread::Current()
{
  PProcess & process = PProcess::Current();
  process.threadMutex.Wait();
  PThread * thread = process.activeThreads.GetAt((unsigned)pthread_self());
  process.threadMutex.Signal();
  return thread;
}


void PThread::Terminate()
{
  if (PX_origStackSize <= 0)
    return;

  // don't use PThread::Current, as the thread may already not be in the
  // active threads list
  if (PX_threadId == pthread_self()) {
    pthread_exit(0);
    return;
  }

  if (IsTerminated())
    return;

  PTRACE(2, "PWLib\tForcing termination of thread " << (void *)this);

  PXAbortBlock();
  WaitForTermination(20);

#if !defined(P_HAS_SEMAPHORES) && !defined(P_HAS_NAMED_SEMAPHORES)
  PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex));
  if (PX_waitingSemaphore != NULL) {
    PAssertPTHREAD(pthread_mutex_lock, (&PX_waitingSemaphore->mutex));
    PX_waitingSemaphore->queuedLocks--;
    PAssertPTHREAD(pthread_mutex_unlock, (&PX_waitingSemaphore->mutex));
    PX_waitingSemaphore = NULL;
  }
  PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));
#endif

#if ( defined(P_NETBSD) && defined(P_NO_CANCEL) )
  PPThreadKill(PX_threadId,SIGKILL);
#else
  if (PX_threadId) {
    pthread_cancel(PX_threadId);
  }
#endif
}


PBoolean PThread::IsTerminated() const
{
  pthread_t id = PX_threadId;
  return (id == 0) || !PPThreadKill(id, 0);
}


void PThread::WaitForTermination() const
{
  if (this == Current()) {
    PTRACE(2, "WaitForTermination short circuited");
    return;
  }
  
  PXAbortBlock();   // this assist in clean shutdowns on some systems

  while (!IsTerminated()) {
    Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100%
               // CPU usage and also yeilds so other threads can run.
  } 
}


PBoolean PThread::WaitForTermination(const PTimeInterval & maxWait) const
{
  if (this == Current()) {
    PTRACE(2, "WaitForTermination(t) short circuited");
    return PTrue;
  }
  
  PTRACE(6, "PWLib\tWaitForTermination(" << maxWait << ')');

  PXAbortBlock();   // this assist in clean shutdowns on some systems
  PTimer timeout = maxWait;
  while (!IsTerminated()) {
    if (timeout == 0)
      return PFalse;
    Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100%
               // CPU usage and also yeilds so other threads can run.
  }
  return PTrue;
}


void * PThread::PX_ThreadStart(void * arg)
{ 
  PThread * thread = (PThread *)arg;
  //don't need to detach the the thread, it was created in the PTHREAD_CREATE_DETACHED state
  // Added this to guarantee that the thread creation (PThread::Restart)
  // has completed before we start the thread. Then the PX_threadId has
  // been set.
  pthread_mutex_lock(&thread->PX_suspendMutex);
  thread->SetThreadName(thread->GetThreadName());
  pthread_mutex_unlock(&thread->PX_suspendMutex);

  // make sure the cleanup routine is called when the thread exits
  pthread_cleanup_push(&PThread::PX_ThreadEnd, arg);

  PTRACE(5, "PWLib\tStarted thread " << thread << ' ' << thread->threadName);

  PProcess::Current().OnThreadStart(*thread);

  // now call the the thread main routine
  thread->Main();

  // execute the cleanup routine
  pthread_cleanup_pop(1);

  return NULL;
}


void PThread::PX_ThreadEnd(void * arg)
{
  PThread * thread = (PThread *)arg;
  PProcess & process = PProcess::Current();
  process.OnThreadEnded(*thread);
  process.threadMutex.Wait();

  pthread_t id = thread->GetThreadId();
  if (id == 0) {
    // Don't know why, but pthreads under Linux at least can call this function
    // multiple times! Probably a bug, but we have to allow for it.
    process.threadMutex.Signal();
    PTRACE(2, "PWLib\tAttempted to multiply end thread " << thread << " ThreadID=" << (void *)id);
    return;
  }  

 // remove this thread from the active thread list
  process.activeThreads.SetAt((unsigned)id, NULL);

  // delete the thread if required, note this is done this way to avoid
  // a race condition, the thread ID cannot be zeroed before the if!
  if (thread->autoDelete) {
    thread->PX_threadId = 0;  // Prevent terminating terminated thread
    process.threadMutex.Signal();
    PTRACE(5, "PWLib\tEnded thread " << thread << ' ' << thread->threadName);

    /* It is now safe to delete this thread. Note that this thread
       is deleted after the process.threadMutex.Signal(), which means
       PWaitAndSignal(process.threadMutex) could not be used */
    delete thread;
  }
  else {
    thread->PX_threadId = 0;
    PString threadName = thread->threadName;
    process.threadMutex.Signal();
    PTRACE(5, "PWLib\tEnded thread " << thread << ' ' << threadName);
  }
}

int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
  PTRACE(7, "PWLib\tPThread::PXBlockOnIO(" << handle << ',' << type << ')');

  if ((handle < 0) || (handle >= PProcess::Current().GetMaxHandles())) {
    PTRACE(2, "PWLib\tAttempt to use illegal handle in PThread::PXBlockOnIO, handle=" << handle);
    errno = EBADF;
    return -1;
  }

  // make sure we flush the buffer before doing a write
  P_fd_set read_fds;
  P_fd_set write_fds;
  P_fd_set exception_fds;

  int retval;
  do {
    switch (type) {
      case PChannel::PXReadBlock:
      case PChannel::PXAcceptBlock:
        read_fds = handle;
        write_fds.Zero();
        exception_fds.Zero();
        break;
      case PChannel::PXWriteBlock:
        read_fds.Zero();
        write_fds = handle;
        exception_fds.Zero();
        break;
      case PChannel::PXConnectBlock:
        read_fds.Zero();
        write_fds = handle;
        exception_fds = handle;
        break;
      default:
        PAssertAlways(PLogicError);
        return 0;
    }

    // include the termination pipe into all blocking I/O functions
    read_fds += unblockPipe[0];

    P_timeval tval = timeout;
    retval = ::select(PMAX(handle, unblockPipe[0])+1,
                      read_fds, write_fds, exception_fds, tval);
  } while (retval < 0 && errno == EINTR);

  if ((retval == 1) && read_fds.IsPresent(unblockPipe[0])) {
    BYTE ch;
    ::read(unblockPipe[0], &ch, 1);
    errno = EINTR;
    retval =  -1;
    PTRACE(6, "PWLib\tUnblocked I/O fd=" << unblockPipe[0]);
  }

  return retval;
}

void PThread::PXAbortBlock() const
{
  static BYTE ch = 0;
  ::write(unblockPipe[1], &ch, 1);
  PTRACE(6, "PWLib\tUnblocking I/O fd=" << unblockPipe[0] << " thread=" << GetThreadName());
}


///////////////////////////////////////////////////////////////////////////////

PSemaphore::PSemaphore(PXClass pxc)
{
  pxClass = pxc;

  // these should never be used, as this constructor is
  // only used for PMutex and PSyncPoint and they have their
  // own copy constructors
  
  initialVar = maxCountVar = 0;
  
  if(pxClass == PXSemaphore) {
#if defined(P_HAS_SEMAPHORES)
    /* call sem_init, otherwise sem_destroy fails*/
    PAssertPTHREAD(sem_init, (&semId, 0, 0));
#elif defined(P_HAS_NAMED_SEMAPHORES)
    semId = CreateSem(0);
#else
    currentCount = maximumCount = 0;
    queuedLocks = 0;
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&condVar, NULL);
#endif
  }
}


PSemaphore::PSemaphore(unsigned initial, unsigned maxCount)
{
  pxClass = PXSemaphore;

  initialVar  = initial;
  maxCountVar = maxCount;

#if defined(P_HAS_SEMAPHORES)
  PAssertPTHREAD(sem_init, (&semId, 0, initial));
#elif defined(P_HAS_NAMED_SEMAPHORES)
  semId = CreateSem(initialVar);
#else
  PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
  PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
  
  PAssert(maxCount > 0, "Invalid semaphore maximum.");
  if (initial > maxCount)
    initial = maxCount;

  currentCount = initial;
  maximumCount = maxCount;
  queuedLocks  = 0;
#endif
}


PSemaphore::PSemaphore(const PSemaphore & sem) 
{
  pxClass = sem.GetSemClass();

  initialVar  = sem.GetInitial();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -