📄 threadin.py
字号:
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, verbose=None): assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) self.__target = target self.__name = str(name or _newname()) self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() self.__started = 0 self.__stopped = 0 self.__block = Condition(Lock()) self.__initialized = 1 def _set_daemon(self): # Overridden in _MainThread and _DummyThread return currentThread().isDaemon() def __repr__(self): assert self.__initialized, "Thread.__init__() was not called" status = "initial" if self.__started: status = "started" if self.__stopped: status = "stopped" if self.__daemonic: status = status + " daemon" return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) def start(self): assert self.__initialized, "Thread.__init__() not called" assert not self.__started, "thread already started" if __debug__: self._note("%s.start(): starting thread", self) _active_limbo_lock.acquire() _limbo[self] = self _active_limbo_lock.release() _start_new_thread(self.__bootstrap, ()) self.__started = 1 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) def run(self): if self.__target: apply(self.__target, self.__args, self.__kwargs) def __bootstrap(self): try: self.__started = 1 _active_limbo_lock.acquire() _active[_get_ident()] = self del _limbo[self] _active_limbo_lock.release() if __debug__: self._note("%s.__bootstrap(): thread started", self) try: self.run() except SystemExit: if __debug__: self._note("%s.__bootstrap(): raised SystemExit", self) except: if __debug__: self._note("%s.__bootstrap(): unhandled exception", self) s = _StringIO() _print_exc(file=s) _sys.stderr.write("Exception in thread %s:\n%s\n" % (self.getName(), s.getvalue())) else: if __debug__: self._note("%s.__bootstrap(): normal return", self) finally: self.__stop() self.__delete() def __stop(self): self.__block.acquire() self.__stopped = 1 self.__block.notifyAll() self.__block.release() def __delete(self): _active_limbo_lock.acquire() del _active[_get_ident()] _active_limbo_lock.release() def join(self, timeout=None): assert self.__initialized, "Thread.__init__() not called" assert self.__started, "cannot join thread before it is started" assert self is not currentThread(), "cannot join current thread" if __debug__: if not self.__stopped: self._note("%s.join(): waiting until thread stops", self) self.__block.acquire() if timeout is None: while not self.__stopped: self.__block.wait() if __debug__: self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout while not self.__stopped: delay = deadline - _time() if delay <= 0: if __debug__: self._note("%s.join(): timed out", self) break self.__block.wait(delay) else: if __debug__: self._note("%s.join(): thread stopped", self) self.__block.release() def getName(self): assert self.__initialized, "Thread.__init__() not called" return self.__name def setName(self, name): assert self.__initialized, "Thread.__init__() not called" self.__name = str(name) def isAlive(self): assert self.__initialized, "Thread.__init__() not called" return self.__started and not self.__stopped def isDaemon(self): assert self.__initialized, "Thread.__init__() not called" return self.__daemonic def setDaemon(self, daemonic): assert self.__initialized, "Thread.__init__() not called" assert not self.__started, "cannot set daemon status of active thread" self.__daemonic = daemonic# Special thread class to represent the main thread# This is garbage collected through an exit handlerclass _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread") self._Thread__started = 1 _active_limbo_lock.acquire() _active[_get_ident()] = self _active_limbo_lock.release() try: self.__oldexitfunc = _sys.exitfunc except AttributeError: self.__oldexitfunc = None _sys.exitfunc = self.__exitfunc def _set_daemon(self): return 0 def __exitfunc(self): self._Thread__stop() t = _pickSomeNonDaemonThread() if t: if __debug__: self._note("%s: waiting for other threads", self) while t: t.join() t = _pickSomeNonDaemonThread() if self.__oldexitfunc: if __debug__: self._note("%s: calling exit handler", self) self.__oldexitfunc() if __debug__: self._note("%s: exiting", self) self._Thread__delete()def _pickSomeNonDaemonThread(): for t in enumerate(): if not t.isDaemon() and t.isAlive(): return t return None# Dummy thread class to represent threads not started here.# These aren't garbage collected when they die,# nor can they be waited for.# Their purpose is to return *something* from currentThread().# They are marked as daemon threads so we won't wait for them# when we exit (conform previous semantics).class _DummyThread(Thread): def __init__(self): Thread.__init__(self, name=_newname("Dummy-%d")) self.__Thread_started = 1 _active_limbo_lock.acquire() _active[_get_ident()] = self _active_limbo_lock.release() def _set_daemon(self): return 1 def join(self): assert 0, "cannot join a dummy thread"# Global API functionsdef currentThread(): try: return _active[_get_ident()] except KeyError: print "currentThread(): no current thread for", _get_ident() return _DummyThread()def activeCount(): _active_limbo_lock.acquire() count = len(_active) + len(_limbo) _active_limbo_lock.release() return countdef enumerate(): _active_limbo_lock.acquire() active = _active.values() + _limbo.values() _active_limbo_lock.release() return active# Create the main thread object_MainThread()# Self-test codedef _test(): import random class BoundedQueue(_Verbose): def __init__(self, limit): _Verbose.__init__(self) self.mon = RLock() self.rc = Condition(self.mon) self.wc = Condition(self.mon) self.limit = limit self.queue = [] def put(self, item): self.mon.acquire() while len(self.queue) >= self.limit: self._note("put(%s): queue full", item) self.wc.wait() self.queue.append(item) self._note("put(%s): appended, length now %d", item, len(self.queue)) self.rc.notify() self.mon.release() def get(self): self.mon.acquire() while not self.queue: self._note("get(): queue empty") self.rc.wait() item = self.queue[0] del self.queue[0] self._note("get(): got %s, %d left", item, len(self.queue)) self.wc.notify() self.mon.release() return item class ProducerThread(Thread): def __init__(self, queue, quota): Thread.__init__(self, name="Producer") self.queue = queue self.quota = quota def run(self): from random import random counter = 0 while counter < self.quota: counter = counter + 1 self.queue.put("%s.%d" % (self.getName(), counter)) _sleep(random() * 0.00001) class ConsumerThread(Thread): def __init__(self, queue, count): Thread.__init__(self, name="Consumer") self.queue = queue self.count = count def run(self): while self.count > 0: item = self.queue.get() print item self.count = self.count - 1 import time NP = 3 QL = 4 NI = 5 Q = BoundedQueue(QL) P = [] for i in range(NP): t = ProducerThread(Q, NI) t.setName("Producer-%d" % (i+1)) P.append(t) C = ConsumerThread(Q, NI*NP) for t in P: t.start() _sleep(0.000001) C.start() for t in P: t.join() C.join()if __name__ == '__main__': _test()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -