⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tlibthrd.cxx

📁 基于VXWORKS H323通信技术源代码
💻 CXX
📖 第 1 页 / 共 2 页
字号:
/*
 * tlibthrd.cxx
 *
 * Routines for pre-emptive threading system
 *
 * Portable Windows Library
 *
 * Copyright (c) 1993-1998 Equivalence Pty. Ltd.
 *
 * The contents of this file are subject to the Mozilla Public License
 * Version 1.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * The Original Code is Portable Windows Library.
 *
 * The Initial Developer of the Original Code is Equivalence Pty. Ltd.
 *
 * Portions are Copyright (C) 1993 Free Software Foundation, Inc.
 * All Rights Reserved.
 *
 * Contributor(s): ______________________________________.
 *
 * $Log: tlibthrd.cxx,v $
 * Revision 1.31  1999/11/18 14:02:57  craigs
 * Fixed problem with houskeeping thread termination
 *
 * Revision 1.30  1999/11/15 01:12:56  craigs
 * Fixed problem with PSemaphore::Wait consuming 100% CPU
 *
 * Revision 1.29  1999/10/30 13:44:11  craigs
 * Added correct method of aborting socket operations asynchronously
 *
 * Revision 1.28  1999/10/24 13:03:30  craigs
 * Changed to capture io break signal
 *
 * Revision 1.27  1999/09/23 06:52:16  robertj
 * Changed PSemaphore to use Posix semaphores.
 *
 * Revision 1.26  1999/09/03 02:26:25  robertj
 * Changes to aid in breaking I/O locks on thread termination. Still needs more work esp in BSD!
 *
 * Revision 1.25  1999/09/02 11:56:35  robertj
 * Fixed problem with destroying PMutex that is already locked.
 *
 * Revision 1.24  1999/08/24 13:40:56  craigs
 * Fixed problem with condwait destorys failing on linux
 *
 * Revision 1.23  1999/08/23 05:33:45  robertj
 * Made last threading changes Linux only.
 *
 * Revision 1.22  1999/08/23 05:14:13  robertj
 * Removed blocking of interrupt signals as does not work in Linux threads.
 *
 * Revision 1.21  1999/07/30 00:40:32  robertj
 * Fixed problem with signal variable in non-Linux platforms
 *
 * Revision 1.20  1999/07/19 01:32:24  craigs
 * Changed signals used in pthreads code, is used by linux version.
 *
 * Revision 1.19  1999/07/15 13:10:55  craigs
 * Fixed problem with EINTR in nontimed sempahore waits
 *
 * Revision 1.18  1999/07/15 13:05:33  robertj
 * Fixed problem with getting EINTR in semaphore wait, is normal, not error.
 *
 * Revision 1.17  1999/07/11 13:42:13  craigs
 * pthreads support for Linux
 *
 * Revision 1.16  1999/05/12 03:29:20  robertj
 * Fixed problem with semaphore free, done at wrong time.
 *
 * Revision 1.15  1999/04/29 08:41:26  robertj
 * Fixed problems with uninitialised mutexes in PProcess.
 *
 * Revision 1.14  1999/03/16 10:54:16  robertj
 * Added parameterless version of WaitForTermination.
 *
 * Revision 1.13  1999/03/16 10:30:37  robertj
 * Added missing PThread::WaitForTermination function.
 *
 * Revision 1.12  1999/01/12 12:09:51  robertj
 * Removed redundent member variable, was in common.
 * Fixed BSD threads compatibility.
 *
 * Revision 1.11  1999/01/11 12:05:56  robertj
 * Fixed some more race conditions in threads.
 *
 * Revision 1.10  1999/01/11 03:42:26  robertj
 * Fixed problem with destroying thread automatically.
 *
 * Revision 1.9  1999/01/09 03:37:28  robertj
 * Fixed problem with closing thread waiting on semaphore.
 * Improved efficiency of mutex to use pthread functions directly.
 *
 * Revision 1.8  1999/01/08 01:31:03  robertj
 * Support for pthreads under FreeBSD
 *
 * Revision 1.7  1998/12/15 12:41:07  robertj
 * Fixed signal handling so can now ^C a pthread version.
 *
 * Revision 1.6  1998/11/05 09:45:04  robertj
 * Removed StartImmediate option in thread construction.
 *
 * Revision 1.5  1998/09/24 04:12:25  robertj
 * Added open software license.
 *
 */

#if !defined(P_VXWORKS)
#include <pthread.h>
#include <sys/resource.h>
#else
#include <taskLib.h>
#include <signal.h>
#endif


#ifdef P_LINUX

#define	SUSPEND_SIG	SIGALRM
#define	RESUME_SIG	SIGVTALRM

#else

#define	SUSPEND_SIG	SIGUSR1
#define	RESUME_SIG	SIGUSR2

#endif

#ifdef  P_PTHREADS
#define P_IO_BREAK_SIGNAL SIGPROF
#endif

PDECLARE_CLASS(HouseKeepingThread, PThread)
  public:
    HouseKeepingThread()
      : PThread(1000, NoAutoDeleteThread) { closing = FALSE; Resume(); }

    void Main();
    void SetClosing() { closing = TRUE; }

  protected:
    BOOL closing;
};


int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
  // make sure we flush the buffer before doing a write
  fd_set tmp_rfd, tmp_wfd, tmp_efd;
  fd_set * read_fds      = &tmp_rfd;
  fd_set * write_fds     = &tmp_wfd;
  fd_set * exception_fds = &tmp_efd;

  FD_ZERO (read_fds);
  FD_ZERO (write_fds);
  FD_ZERO (exception_fds);

  switch (type) {
    case PChannel::PXReadBlock:
    case PChannel::PXAcceptBlock:
      FD_SET (handle, read_fds);
      break;
    case PChannel::PXWriteBlock:
      FD_SET (handle, write_fds);
      break;
    case PChannel::PXConnectBlock:
      FD_SET (handle, write_fds);
      FD_SET (handle, exception_fds);
      break;
    default:
      PAssertAlways(PLogicError);
      return 0;
  }

  struct timeval * tptr = NULL;
  struct timeval   timeout_val;
  if (timeout != PMaxTimeInterval) {
    static const PTimeInterval oneDay(0, 0, 0, 0, 1);
    if (timeout < oneDay) {
      timeout_val.tv_usec = (timeout.GetMilliSeconds() % 1000) * 1000;
      timeout_val.tv_sec  = timeout.GetSeconds();
      tptr                = &timeout_val;
    }
  }

  // include the termination pipe into all blocking I/O functions
  int width = handle+1;
  FD_SET(termPipe[0], read_fds);
  width = PMAX(width, termPipe[0]+1);

  int retval = ::select(width, read_fds, write_fds, exception_fds, tptr);
  PProcess::Current().PXCheckSignals();

  if ((retval == 1) && FD_ISSET(termPipe[0], read_fds)) {
    BYTE ch;
    ::read(termPipe[0], &ch, 1);
    errno = EINTR;
    retval =  -1;
  }

  return retval;
}


static void sigSuspendHandler(int)
{
  // wait for a resume signal
  sigset_t waitSignals;
  sigemptyset(&waitSignals);
  sigaddset(&waitSignals, RESUME_SIG);
  sigaddset(&waitSignals, SIGINT);
  sigaddset(&waitSignals, SIGQUIT);
  sigaddset(&waitSignals, SIGTERM);

  for (;;) {
    int sig;
#ifdef P_LINUX
    sigwait(&waitSignals, &sig);
#else
    sig = sigwait(&waitSignals);
#endif
    PProcess::Current().PXCheckSignals();
    if (sig == RESUME_SIG)
      return;
  }
}


void HouseKeepingThread::Main()
{
  PProcess & process = PProcess::Current();

  while (!closing) {
    PTimeInterval waitTime = process.timers.Process();
    if (waitTime == PMaxTimeInterval)
      process.timerChangeSemaphore.Wait();
    else
      process.timerChangeSemaphore.Wait(waitTime);
  }
}


void PProcess::Construct()
{
  // make sure we don't get upset by resume signals
  sigset_t blockedSignals;
  sigemptyset(&blockedSignals);
  sigaddset(&blockedSignals, RESUME_SIG);
  PAssertOS(pthread_sigmask(SIG_BLOCK, &blockedSignals, NULL) == 0);

  // set the file descriptor limit to something sensible
  struct rlimit rl;
  PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
  rl.rlim_cur = rl.rlim_max;
  PAssertOS(setrlimit(RLIMIT_NOFILE, &rl) == 0);

  // initialise the housekeeping thread
  housekeepingThread = NULL;

  CommonConstruct();
}


PProcess::~PProcess()
{
  if (housekeepingThread != NULL) {
    ((HouseKeepingThread *)housekeepingThread)->SetClosing();
    SignalTimerChange();
    housekeepingThread->WaitForTermination();
    delete housekeepingThread;
  }
  CommonDestruct();
}


PThread::PThread()
{
  // see InitialiseProcessThread()
}


void PThread::InitialiseProcessThread()
{
  PX_origStackSize    = 0;
  autoDelete          = FALSE;
  PX_waitingSemaphore = NULL;
  PX_threadId         = pthread_self();
  PX_suspendCount     = 0;
  ::pipe(termPipe);

  PAssertOS(pthread_mutex_init(&PX_WaitSemMutex, NULL) == 0);
  PAssertOS(pthread_mutex_init(&PX_suspendMutex, NULL) == 0);

  ((PProcess *)this)->activeThreads.DisallowDeleteObjects();
  ((PProcess *)this)->activeThreads.SetAt((unsigned)PX_threadId, this);
}


PThread::PThread(PINDEX stackSize,
                 AutoDeleteFlag deletion,
                 Priority /*priorityLevel*/)
{
  PAssert(stackSize > 0, PInvalidParameter);

  PX_origStackSize = stackSize;
  autoDelete       = (deletion == AutoDeleteThread);

  PX_waitingSemaphore = NULL;
  pthread_mutex_init(&PX_WaitSemMutex, NULL);

  PAssertOS(pthread_mutex_init(&PX_suspendMutex, NULL) == 0);

  ::pipe(termPipe);

  // throw the new thread
  PX_NewThread(TRUE);
}


PThread::~PThread()
{
  if (!IsTerminated())
    Terminate();

  ::close(termPipe[0]);
  ::close(termPipe[1]);

  PAssertOS(pthread_mutex_destroy(&PX_WaitSemMutex) == 0);
  PAssertOS(pthread_mutex_destroy(&PX_suspendMutex) == 0);
}


void PThread::PX_NewThread(BOOL startSuspended)
{
  // initialise suspend counter and create mutex
  PX_suspendCount = startSuspended ? 1 : 0;

  // throw the thread
//  pthread_attr_t threadAttr;
//  pthread_attr_init(&threadAttr);
  PAssertOS(pthread_create(&PX_threadId, NULL, PX_ThreadStart, this) == 0);
}


void * PThread::PX_ThreadStart(void * arg)
{ 
  pthread_t threadId = pthread_self();

  // self-detach
  pthread_detach(threadId);

  PThread * thread = (PThread *)arg;

  PProcess & process = PProcess::Current();

  // block RESUME_SIG
  sigset_t blockedSignals;
  sigemptyset(&blockedSignals);
  sigaddset(&blockedSignals, RESUME_SIG);
  //sigaddset(&blockedSignals, P_IO_BREAK_SIGNAL);
  PAssertOS(pthread_sigmask(SIG_BLOCK, &blockedSignals, NULL) == 0);

  // add thread to thread list
  process.threadMutex.Wait();
  process.activeThreads.SetAt((unsigned)threadId, thread);
  process.threadMutex.Signal();

  // make sure the cleanup routine is called when the thread exits
  pthread_cleanup_push(PThread::PX_ThreadEnd, arg);

  // if we are not supposed to start suspended, then don't wait
  // if we are supposed to start suspended, then wait for a resume
  PAssertOS(pthread_mutex_lock(&thread->PX_suspendMutex) == 0);
  if (thread->PX_suspendCount ==  0) 
    PAssertOS(pthread_mutex_unlock(&thread->PX_suspendMutex) == 0);
  else {
    PAssertOS(pthread_mutex_unlock(&thread->PX_suspendMutex) == 0);
    sigset_t waitSignals;
    sigemptyset(&waitSignals);
    sigaddset(&waitSignals, RESUME_SIG);
#ifdef P_LINUX
  int sig;
  sigwait(&waitSignals, &sig);
#else
    sigwait(&waitSignals);
#endif
  }

  // set the signal handler for SUSPEND_SIG
  struct sigaction action;
  memset(&action, 0, sizeof(action));
  action.sa_handler = sigSuspendHandler;
  sigaction(SUSPEND_SIG, &action, 0);

  // now call the the thread main routine
  thread->Main();

  // execute the cleanup routine
  pthread_cleanup_pop(1);

  return NULL;
}


void PProcess::SignalTimerChange()
{
  if (housekeepingThread == NULL)
    housekeepingThread = PNEW HouseKeepingThread;
  else
    timerChangeSemaphore.Signal();
}


void PThread::PX_ThreadEnd(void * arg)
{
  PThread * thread = (PThread *)arg;
  PProcess & process = PProcess::Current();

  // remove this thread from the thread list
  process.threadMutex.Wait();
  process.activeThreads.SetAt(thread->PX_GetThreadId(), NULL);
  process.threadMutex.Signal();
  
  thread->PX_threadId = 0;  // Prevent terminating terminated thread

  // delete the thread if required
  if (thread->autoDelete)
    delete thread;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -