📄 tlibthrd.cxx
字号:
/* * tlibthrd.cxx * * Routines for pre-emptive threading system * * Portable Windows Library * * Copyright (c) 1993-1998 Equivalence Pty. Ltd. * * The contents of this file are subject to the Mozilla Public License * Version 1.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * The Original Code is Portable Windows Library. * * The Initial Developer of the Original Code is Equivalence Pty. Ltd. * * Portions are Copyright (C) 1993 Free Software Foundation, Inc. * All Rights Reserved. * * Contributor(s): ______________________________________. * * $Log: tlibthrd.cxx,v $ * Revision 1.41 2000/06/21 01:01:22 robertj * AIX port, thanks Wolfgang Platzer (wolfgang.platzer@infonova.at). * * Revision 1.40 2000/04/13 07:21:10 rogerh * Fix typo in #defined * * Revision 1.39 2000/04/11 11:38:49 rogerh * More NetBSD Pthread changes * * Revision 1.38 2000/04/10 11:47:02 rogerh * Add initial NetBSD pthreads support * * Revision 1.37 2000/04/06 12:19:49 rogerh * Add Mac OS X support submitted by Kevin Packard * * Revision 1.36 2000/03/20 22:56:34 craigs * Fixed problems with race conditions caused by testing or changing * attributes on a terminated thread. Only occured on a fast machine! * * Revision 1.35 2000/03/17 03:45:40 craigs * Fixed problem with connect call hanging * * Revision 1.34 2000/03/08 12:17:09 rogerh * Add OpenBSD support * * Revision 1.33 2000/02/29 13:18:21 robertj * Added named threads to tracing, thanks to Dave Harvey * * Revision 1.32 2000/01/20 08:20:57 robertj * FreeBSD v3 compatibility changes, thanks Roger Hardiman & Motonori Shindo * * Revision 1.31 1999/11/18 14:02:57 craigs * Fixed problem with houskeeping thread termination * * Revision 1.30 1999/11/15 01:12:56 craigs * Fixed problem with PSemaphore::Wait consuming 100% CPU * * Revision 1.29 1999/10/30 13:44:11 craigs * Added correct method of aborting socket operations asynchronously * * Revision 1.28 1999/10/24 13:03:30 craigs * Changed to capture io break signal * * Revision 1.27 1999/09/23 06:52:16 robertj * Changed PSemaphore to use Posix semaphores. * * Revision 1.26 1999/09/03 02:26:25 robertj * Changes to aid in breaking I/O locks on thread termination. Still needs more work esp in BSD! * * Revision 1.25 1999/09/02 11:56:35 robertj * Fixed problem with destroying PMutex that is already locked. * * Revision 1.24 1999/08/24 13:40:56 craigs * Fixed problem with condwait destorys failing on linux * * Revision 1.23 1999/08/23 05:33:45 robertj * Made last threading changes Linux only. * * Revision 1.22 1999/08/23 05:14:13 robertj * Removed blocking of interrupt signals as does not work in Linux threads. * * Revision 1.21 1999/07/30 00:40:32 robertj * Fixed problem with signal variable in non-Linux platforms * * Revision 1.20 1999/07/19 01:32:24 craigs * Changed signals used in pthreads code, is used by linux version. * * Revision 1.19 1999/07/15 13:10:55 craigs * Fixed problem with EINTR in nontimed sempahore waits * * Revision 1.18 1999/07/15 13:05:33 robertj * Fixed problem with getting EINTR in semaphore wait, is normal, not error. * * Revision 1.17 1999/07/11 13:42:13 craigs * pthreads support for Linux * * Revision 1.16 1999/05/12 03:29:20 robertj * Fixed problem with semaphore free, done at wrong time. * * Revision 1.15 1999/04/29 08:41:26 robertj * Fixed problems with uninitialised mutexes in PProcess. * * Revision 1.14 1999/03/16 10:54:16 robertj * Added parameterless version of WaitForTermination. * * Revision 1.13 1999/03/16 10:30:37 robertj * Added missing PThread::WaitForTermination function. * * Revision 1.12 1999/01/12 12:09:51 robertj * Removed redundent member variable, was in common. * Fixed BSD threads compatibility. * * Revision 1.11 1999/01/11 12:05:56 robertj * Fixed some more race conditions in threads. * * Revision 1.10 1999/01/11 03:42:26 robertj * Fixed problem with destroying thread automatically. * * Revision 1.9 1999/01/09 03:37:28 robertj * Fixed problem with closing thread waiting on semaphore. * Improved efficiency of mutex to use pthread functions directly. * * Revision 1.8 1999/01/08 01:31:03 robertj * Support for pthreads under FreeBSD * * Revision 1.7 1998/12/15 12:41:07 robertj * Fixed signal handling so can now ^C a pthread version. * * Revision 1.6 1998/11/05 09:45:04 robertj * Removed StartImmediate option in thread construction. * * Revision 1.5 1998/09/24 04:12:25 robertj * Added open software license. * */#include <pthread.h>#include <sys/resource.h>#ifdef P_LINUX#define SUSPEND_SIG SIGALRM#define RESUME_SIG SIGVTALRM#else#define SUSPEND_SIG SIGUSR1#define RESUME_SIG SIGUSR2#endif#ifdef P_PTHREADS#define P_IO_BREAK_SIGNAL SIGPROF#endifPDECLARE_CLASS(HouseKeepingThread, PThread) public: HouseKeepingThread() : PThread(1000, NoAutoDeleteThread) { closing = FALSE; Resume(); } void Main(); void SetClosing() { closing = TRUE; } protected: BOOL closing;};int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout){ // make sure we flush the buffer before doing a write fd_set tmp_rfd, tmp_wfd, tmp_efd; fd_set * read_fds = &tmp_rfd; fd_set * write_fds = &tmp_wfd; fd_set * exception_fds = &tmp_efd; FD_ZERO (read_fds); FD_ZERO (write_fds); FD_ZERO (exception_fds); switch (type) { case PChannel::PXReadBlock: case PChannel::PXAcceptBlock: FD_SET (handle, read_fds); break; case PChannel::PXWriteBlock: FD_SET (handle, write_fds); break; case PChannel::PXConnectBlock: FD_SET (handle, write_fds); FD_SET (handle, exception_fds); break; default: PAssertAlways(PLogicError); return 0; } struct timeval * tptr = NULL; struct timeval timeout_val; if (timeout != PMaxTimeInterval) { static const PTimeInterval oneDay(0, 0, 0, 0, 1); if (timeout < oneDay) { timeout_val.tv_usec = (timeout.GetMilliSeconds() % 1000) * 1000; timeout_val.tv_sec = timeout.GetSeconds(); tptr = &timeout_val; } } // include the termination pipe into all blocking I/O functions int width = handle+1; FD_SET(termPipe[0], read_fds); width = PMAX(width, termPipe[0]+1); int retval = ::select(width, read_fds, write_fds, exception_fds, tptr); PProcess::Current().PXCheckSignals(); if ((retval == 1) && FD_ISSET(termPipe[0], read_fds)) { BYTE ch; ::read(termPipe[0], &ch, 1); errno = EINTR; retval = -1; } return retval;}static void sigSuspendHandler(int){ // wait for a resume signal sigset_t waitSignals; sigemptyset(&waitSignals); sigaddset(&waitSignals, RESUME_SIG); sigaddset(&waitSignals, SIGINT); sigaddset(&waitSignals, SIGQUIT); sigaddset(&waitSignals, SIGTERM); for (;;) { int sig;#if defined(P_LINUX) || defined(P_FREEBSD) || defined(P_OPENBSD) || defined(P_NETBSD) || defined(P_MACOSX) || defined (P_AIX) sigwait(&waitSignals, &sig);#else sig = sigwait(&waitSignals);#endif PProcess::Current().PXCheckSignals(); if (sig == RESUME_SIG) return; }}void HouseKeepingThread::Main(){ PProcess & process = PProcess::Current(); while (!closing) { PTimeInterval waitTime = process.timers.Process(); if (waitTime == PMaxTimeInterval) process.timerChangeSemaphore.Wait(); else process.timerChangeSemaphore.Wait(waitTime); }}void PProcess::Construct(){ // make sure we don't get upset by resume signals sigset_t blockedSignals; sigemptyset(&blockedSignals); sigaddset(&blockedSignals, RESUME_SIG); PAssertOS(pthread_sigmask(SIG_BLOCK, &blockedSignals, NULL) == 0); // set the file descriptor limit to something sensible struct rlimit rl; PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0); rl.rlim_cur = rl.rlim_max; PAssertOS(setrlimit(RLIMIT_NOFILE, &rl) == 0); // initialise the housekeeping thread housekeepingThread = NULL; CommonConstruct();}PProcess::~PProcess(){ if (housekeepingThread != NULL) { ((HouseKeepingThread *)housekeepingThread)->SetClosing(); SignalTimerChange(); housekeepingThread->WaitForTermination(); delete housekeepingThread; } CommonDestruct();}PThread::PThread(){ // see InitialiseProcessThread()}void PThread::InitialiseProcessThread(){ PX_origStackSize = 0; autoDelete = FALSE; PX_threadId = pthread_self(); PX_suspendCount = 0; ::pipe(termPipe);#ifndef P_HAS_SEMAPHORES PX_waitingSemaphore = NULL; PAssertOS(pthread_mutex_init(&PX_WaitSemMutex, NULL) == 0);#endif PAssertOS(pthread_mutex_init(&PX_suspendMutex, NULL) == 0); ((PProcess *)this)->activeThreads.DisallowDeleteObjects(); ((PProcess *)this)->activeThreads.SetAt((unsigned)PX_threadId, this);}PThread::PThread(PINDEX stackSize, AutoDeleteFlag deletion, Priority /*priorityLevel*/, const PString & name) : threadName(name){ PAssert(stackSize > 0, PInvalidParameter); PX_origStackSize = stackSize; autoDelete = (deletion == AutoDeleteThread);#ifndef P_HAS_SEMAPHORES PX_waitingSemaphore = NULL; pthread_mutex_init(&PX_WaitSemMutex, NULL);#endif PAssertOS(pthread_mutex_init(&PX_suspendMutex, NULL) == 0); ::pipe(termPipe); // throw the new thread PX_NewThread(TRUE);}PThread::~PThread(){ if (!IsTerminated()) Terminate(); ::close(termPipe[0]); ::close(termPipe[1]);#ifndef P_HAS_SEMAPHORES //PAssertOS(pthread_mutex_destroy(&PX_WaitSemMutex) == 0); pthread_mutex_destroy(&PX_WaitSemMutex);#endif //PAssertOS(pthread_mutex_destroy(&PX_suspendMutex) == 0); pthread_mutex_destroy(&PX_suspendMutex);}void PThread::PX_NewThread(BOOL startSuspended){ // initialise suspend counter and create mutex PX_suspendCount = startSuspended ? 1 : 0; // throw the thread// pthread_attr_t threadAttr;// pthread_attr_init(&threadAttr); PAssertOS(pthread_create(&PX_threadId, NULL, PX_ThreadStart, this) == 0);}void * PThread::PX_ThreadStart(void * arg){ pthread_t threadId = pthread_self(); // self-detach pthread_detach(threadId); PThread * thread = (PThread *)arg; PProcess & process = PProcess::Current(); // block RESUME_SIG sigset_t blockedSignals; sigemptyset(&blockedSignals); sigaddset(&blockedSignals, RESUME_SIG); //sigaddset(&blockedSignals, P_IO_BREAK_SIGNAL); PAssertOS(pthread_sigmask(SIG_BLOCK, &blockedSignals, NULL) == 0); // add thread to thread list process.threadMutex.Wait(); process.activeThreads.SetAt((unsigned)threadId, thread); process.threadMutex.Signal(); // make sure the cleanup routine is called when the thread exits pthread_cleanup_push(PThread::PX_ThreadEnd, arg); // if we are not supposed to start suspended, then don't wait // if we are supposed to start suspended, then wait for a resume //PAssertOS(pthread_mutex_lock(&thread->PX_suspendMutex) == 0); //if (thread->PX_suspendCount == 0) // PAssertOS(pthread_mutex_unlock(&thread->PX_suspendMutex) == 0); //else { // PAssertOS(pthread_mutex_unlock(&thread->PX_suspendMutex) == 0); if (thread->PX_suspendCount != 0) { sigset_t waitSignals; sigemptyset(&waitSignals); sigaddset(&waitSignals, RESUME_SIG);#if defined(P_LINUX) || defined(P_FREEBSD) || defined(P_OPENBSD) || defined(P_NETBSD) || defined (P_AIX) int sig; sigwait(&waitSignals, &sig);#else sigwait(&waitSignals);#endif } // set the signal handler for SUSPEND_SIG struct sigaction action; memset(&action, 0, sizeof(action)); action.sa_handler = sigSuspendHandler; sigaction(SUSPEND_SIG, &action, 0); // now call the the thread main routine thread->Main(); // execute the cleanup routine pthread_cleanup_pop(1); return NULL;}void PProcess::SignalTimerChange(){ if (housekeepingThread == NULL) housekeepingThread = PNEW HouseKeepingThread; else timerChangeSemaphore.Signal();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -