⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadin.py

📁 minimal python variant for small footprint apps like embedded apps
💻 PY
📖 第 1 页 / 共 2 页
字号:
    def __init__(self, group=None, target=None, name=None,                 args=(), kwargs={}, verbose=None):        assert group is None, "group argument must be None for now"        _Verbose.__init__(self, verbose)        self.__target = target        self.__name = str(name or _newname())        self.__args = args        self.__kwargs = kwargs        self.__daemonic = self._set_daemon()        self.__started = 0        self.__stopped = 0        self.__block = Condition(Lock())        self.__initialized = 1    def _set_daemon(self):        # Overridden in _MainThread and _DummyThread        return currentThread().isDaemon()    def __repr__(self):        assert self.__initialized, "Thread.__init__() was not called"        status = "initial"        if self.__started:            status = "started"        if self.__stopped:            status = "stopped"        if self.__daemonic:            status = status + " daemon"        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)    def start(self):        assert self.__initialized, "Thread.__init__() not called"        assert not self.__started, "thread already started"        if __debug__:            self._note("%s.start(): starting thread", self)        _active_limbo_lock.acquire()        _limbo[self] = self        _active_limbo_lock.release()        _start_new_thread(self.__bootstrap, ())        self.__started = 1        _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack)    def run(self):        if self.__target:            apply(self.__target, self.__args, self.__kwargs)    def __bootstrap(self):        try:            self.__started = 1            _active_limbo_lock.acquire()            _active[_get_ident()] = self            del _limbo[self]            _active_limbo_lock.release()            if __debug__:                self._note("%s.__bootstrap(): thread started", self)            try:                self.run()            except SystemExit:                if __debug__:                    self._note("%s.__bootstrap(): raised SystemExit", self)            except:                if __debug__:                    self._note("%s.__bootstrap(): unhandled exception", self)                s = _StringIO()                _print_exc(file=s)                _sys.stderr.write("Exception in thread %s:\n%s\n" %                                 (self.getName(), s.getvalue()))            else:                if __debug__:                    self._note("%s.__bootstrap(): normal return", self)        finally:            self.__stop()            self.__delete()    def __stop(self):        self.__block.acquire()        self.__stopped = 1        self.__block.notifyAll()        self.__block.release()    def __delete(self):        _active_limbo_lock.acquire()        del _active[_get_ident()]        _active_limbo_lock.release()    def join(self, timeout=None):        assert self.__initialized, "Thread.__init__() not called"        assert self.__started, "cannot join thread before it is started"        assert self is not currentThread(), "cannot join current thread"        if __debug__:            if not self.__stopped:                self._note("%s.join(): waiting until thread stops", self)        self.__block.acquire()        if timeout is None:            while not self.__stopped:                self.__block.wait()            if __debug__:                self._note("%s.join(): thread stopped", self)        else:            deadline = _time() + timeout            while not self.__stopped:                delay = deadline - _time()                if delay <= 0:                    if __debug__:                        self._note("%s.join(): timed out", self)                    break                self.__block.wait(delay)            else:                if __debug__:                    self._note("%s.join(): thread stopped", self)        self.__block.release()    def getName(self):        assert self.__initialized, "Thread.__init__() not called"        return self.__name    def setName(self, name):        assert self.__initialized, "Thread.__init__() not called"        self.__name = str(name)    def isAlive(self):        assert self.__initialized, "Thread.__init__() not called"        return self.__started and not self.__stopped        def isDaemon(self):        assert self.__initialized, "Thread.__init__() not called"        return self.__daemonic    def setDaemon(self, daemonic):        assert self.__initialized, "Thread.__init__() not called"        assert not self.__started, "cannot set daemon status of active thread"        self.__daemonic = daemonic# Special thread class to represent the main thread# This is garbage collected through an exit handlerclass _MainThread(Thread):    def __init__(self):        Thread.__init__(self, name="MainThread")        self._Thread__started = 1        _active_limbo_lock.acquire()        _active[_get_ident()] = self        _active_limbo_lock.release()        try:            self.__oldexitfunc = _sys.exitfunc        except AttributeError:            self.__oldexitfunc = None        _sys.exitfunc = self.__exitfunc    def _set_daemon(self):        return 0    def __exitfunc(self):        self._Thread__stop()        t = _pickSomeNonDaemonThread()        if t:            if __debug__:                self._note("%s: waiting for other threads", self)        while t:            t.join()            t = _pickSomeNonDaemonThread()        if self.__oldexitfunc:            if __debug__:                self._note("%s: calling exit handler", self)            self.__oldexitfunc()        if __debug__:            self._note("%s: exiting", self)        self._Thread__delete()def _pickSomeNonDaemonThread():    for t in enumerate():        if not t.isDaemon() and t.isAlive():            return t    return None# Dummy thread class to represent threads not started here.# These aren't garbage collected when they die,# nor can they be waited for.# Their purpose is to return *something* from currentThread().# They are marked as daemon threads so we won't wait for them# when we exit (conform previous semantics).class _DummyThread(Thread):        def __init__(self):        Thread.__init__(self, name=_newname("Dummy-%d"))        self.__Thread_started = 1        _active_limbo_lock.acquire()        _active[_get_ident()] = self        _active_limbo_lock.release()    def _set_daemon(self):        return 1    def join(self):        assert 0, "cannot join a dummy thread"# Global API functionsdef currentThread():    try:        return _active[_get_ident()]    except KeyError:        print "currentThread(): no current thread for", _get_ident()        return _DummyThread()def activeCount():    _active_limbo_lock.acquire()    count = len(_active) + len(_limbo)    _active_limbo_lock.release()    return countdef enumerate():    _active_limbo_lock.acquire()    active = _active.values() + _limbo.values()    _active_limbo_lock.release()    return active# Create the main thread object_MainThread()# Self-test codedef _test():    import random    class BoundedQueue(_Verbose):        def __init__(self, limit):            _Verbose.__init__(self)            self.mon = RLock()            self.rc = Condition(self.mon)            self.wc = Condition(self.mon)            self.limit = limit            self.queue = []        def put(self, item):            self.mon.acquire()            while len(self.queue) >= self.limit:                self._note("put(%s): queue full", item)                self.wc.wait()            self.queue.append(item)            self._note("put(%s): appended, length now %d",                       item, len(self.queue))            self.rc.notify()            self.mon.release()        def get(self):            self.mon.acquire()            while not self.queue:                self._note("get(): queue empty")                self.rc.wait()            item = self.queue[0]            del self.queue[0]            self._note("get(): got %s, %d left", item, len(self.queue))            self.wc.notify()            self.mon.release()            return item    class ProducerThread(Thread):        def __init__(self, queue, quota):            Thread.__init__(self, name="Producer")            self.queue = queue            self.quota = quota        def run(self):            from random import random            counter = 0            while counter < self.quota:                counter = counter + 1                self.queue.put("%s.%d" % (self.getName(), counter))                _sleep(random() * 0.00001)    class ConsumerThread(Thread):        def __init__(self, queue, count):            Thread.__init__(self, name="Consumer")            self.queue = queue            self.count = count        def run(self):            while self.count > 0:                item = self.queue.get()                print item                self.count = self.count - 1    import time    NP = 3    QL = 4    NI = 5    Q = BoundedQueue(QL)    P = []    for i in range(NP):        t = ProducerThread(Q, NI)        t.setName("Producer-%d" % (i+1))        P.append(t)    C = ConsumerThread(Q, NI*NP)    for t in P:        t.start()        _sleep(0.000001)    C.start()    for t in P:        t.join()    C.join()if __name__ == '__main__':    _test()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -