📄 threadin.py
字号:
# threading.py:# Proposed new threading module, emulating a subset of Java's threading modelimport sysimport timeimport threadimport tracebackimport StringIO# Rename some stuff so "from threading import *" is safe_sys = sysdel sys_time = time.time_sleep = time.sleepdel time_start_new_thread = thread.start_new_thread_allocate_lock = thread.allocate_lock_get_ident = thread.get_identdel thread_print_exc = traceback.print_excdel traceback_StringIO = StringIO.StringIOdel StringIO# Debug support (adapted from ihooks.py)_VERBOSE = 0if __debug__: class _Verbose: def __init__(self, verbose=None): if verbose is None: verbose = _VERBOSE self.__verbose = verbose def _note(self, format, *args): if self.__verbose: format = format % args format = "%s: %s\n" % ( currentThread().getName(), format) _sys.stderr.write(format)else: # Disable this when using "python -O" class _Verbose: def __init__(self, verbose=None): pass def _note(self, *args): pass# Synchronization classesLock = _allocate_lockdef RLock(*args, **kwargs): return apply(_RLock, args, kwargs)class _RLock(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__block = _allocate_lock() self.__owner = None self.__count = 0 def __repr__(self): return "<%s(%s, %d)>" % ( self.__class__.__name__, self.__owner and self.__owner.getName(), self.__count) def acquire(self, blocking=1): me = currentThread() if self.__owner is me: self.__count = self.__count + 1 if __debug__: self._note("%s.acquire(%s): recursive success", self, blocking) return 1 rc = self.__block.acquire(blocking) if rc: self.__owner = me self.__count = 1 if __debug__: self._note("%s.acquire(%s): initial succes", self, blocking) else: if __debug__: self._note("%s.acquire(%s): failure", self, blocking) return rc def release(self): me = currentThread() assert self.__owner is me, "release() of un-acquire()d lock" self.__count = count = self.__count - 1 if not count: self.__owner = None self.__block.release() if __debug__: self._note("%s.release(): final release", self) else: if __debug__: self._note("%s.release(): non-final release", self) # Internal methods used by condition variables def _acquire_restore(self, (count, owner)): self.__block.acquire() self.__count = count self.__owner = owner if __debug__: self._note("%s._acquire_restore()", self) def _release_save(self): if __debug__: self._note("%s._release_save()", self) count = self.__count self.__count = 0 owner = self.__owner self.__owner = None self.__block.release() return (count, owner) def _is_owned(self): return self.__owner is currentThread()def Condition(*args, **kwargs): return apply(_Condition, args, kwargs)class _Condition(_Verbose): def __init__(self, lock=None, verbose=None): _Verbose.__init__(self, verbose) if lock is None: lock = RLock() self.__lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self.__waiters = [] def __repr__(self): return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) def _release_save(self): self.__lock.release() # No state to save def _acquire_restore(self, x): self.__lock.acquire() # Ignore saved state def _is_owned(self): if self.__lock.acquire(0): self.__lock.release() return 0 else: return 1 def wait(self, timeout=None): me = currentThread() assert self._is_owned(), "wait() of un-acquire()d lock" waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() if timeout is None: waiter.acquire() if __debug__: self._note("%s.wait(): got it", self) else: endtime = _time() + timeout delay = 0.000001 # 1 usec while 1: gotit = waiter.acquire(0) if gotit or _time() >= endtime: break _sleep(delay) if delay < 1.0: delay = delay * 2.0 if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) try: self.__waiters.remove(waiter) except ValueError: pass else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) self._acquire_restore(saved_state) def notify(self, n=1): me = currentThread() assert self._is_owned(), "notify() of un-acquire()d lock" __waiters = self.__waiters waiters = __waiters[:n] if not waiters: if __debug__: self._note("%s.notify(): no waiters", self) return self._note("%s.notify(): notifying %d waiter%s", self, n, n!=1 and "s" or "") for waiter in waiters: waiter.release() try: __waiters.remove(waiter) except ValueError: pass def notifyAll(self): self.notify(len(self.__waiters))def Semaphore(*args, **kwargs): return apply(_Semaphore, args, kwargs)class _Semaphore(_Verbose): # After Tim Peters' semaphore class, but bnot quite the same (no maximum) def __init__(self, value=1, verbose=None): assert value >= 0, "Semaphore initial value must be >= 0" _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__value = value def acquire(self, blocking=1): rc = 0 self.__cond.acquire() while self.__value == 0: if not blocking: break self.__cond.wait() else: self.__value = self.__value - 1 rc = 1 self.__cond.release() return rc def release(self): self.__cond.acquire() self.__value = self.__value + 1 self.__cond.notify() self.__cond.release()def Event(*args, **kwargs): return apply(_Event, args, kwargs)class _Event(_Verbose): # After Tim Peters' event class (without is_posted()) def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__flag = 0 def isSet(self): return self.__flag def set(self): self.__cond.acquire() self.__flag = 1 self.__cond.notifyAll() self.__cond.release() def clear(self): self.__cond.acquire() self.__flag = 0 self.__cond.release() def wait(self, timeout=None): self.__cond.acquire() if not self.__flag: self.__cond.wait(timeout) self.__cond.release()# Helper to generate new thread names_counter = 0def _newname(template="Thread-%d"): global _counter _counter = _counter + 1 return template % _counter# Active thread administration_active_limbo_lock = _allocate_lock()_active = {}_limbo = {}# Main class for threadsclass Thread(_Verbose): __initialized = 0
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -