test_fork1.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 76 行
PY
76 行
"""This test checks for correct fork() behavior.We want fork1() semantics -- only the forking thread survives in thechild after a fork().On some systems (e.g. Solaris without posix threads) we find that allactive threads survive in the child after a fork(); this is an error.While BeOS doesn't officially support fork and native threading inthe same application, the present example should work just fine. DC"""import os, sys, time, threadfrom test_support import verify, verbose, TestSkippedtry: os.forkexcept AttributeError: raise TestSkipped, "os.fork not defined -- skipping test_fork1"LONGSLEEP = 2SHORTSLEEP = 0.5NUM_THREADS = 4alive = {}stop = 0def f(id): while not stop: alive[id] = os.getpid() try: time.sleep(SHORTSLEEP) except IOError: passdef main(): for i in range(NUM_THREADS): thread.start_new(f, (i,)) time.sleep(LONGSLEEP) a = alive.keys() a.sort() verify(a == range(NUM_THREADS)) prefork_lives = alive.copy() if sys.platform in ['unixware7']: cpid = os.fork1() else: cpid = os.fork() if cpid == 0: # Child time.sleep(LONGSLEEP) n = 0 for key in alive.keys(): if alive[key] != prefork_lives[key]: n = n+1 os._exit(n) else: # Parent spid, status = os.waitpid(cpid, 0) verify(spid == cpid) verify(status == 0, "cause = %d, exit = %d" % (status&0xff, status>>8) ) global stop # Tell threads to die stop = 1 time.sleep(2*SHORTSLEEP) # Wait for threads to diemain()
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?