test_threading.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 56 行
PY
56 行
# Very rudimentary test of threading module# Create a bunch of threads, let each do some work, wait until all are donefrom test_support import verboseimport randomimport threadingimport time# This takes about n/3 seconds to run (about n/3 clumps of tasks, times# about 1 second per clump).numtasks = 10# no more than 3 of the 10 can run at oncesema = threading.BoundedSemaphore(value=3)mutex = threading.RLock()running = 0class TestThread(threading.Thread): def run(self): global running delay = random.random() * 2 if verbose: print 'task', self.getName(), 'will run for', delay, 'sec' sema.acquire() mutex.acquire() running = running + 1 if verbose: print running, 'tasks are running' mutex.release() time.sleep(delay) if verbose: print 'task', self.getName(), 'done' mutex.acquire() running = running - 1 if verbose: print self.getName(), 'is finished.', running, 'tasks are running' mutex.release() sema.release()threads = []def starttasks(): for i in range(numtasks): t = TestThread(name="<thread %d>"%i) threads.append(t) t.start()starttasks()if verbose: print 'waiting for all tasks to complete'for t in threads: t.join()if verbose: print 'all tasks done'
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?