test_fork1.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 76 行

PY
76
字号
"""This test checks for correct fork() behavior.We want fork1() semantics -- only the forking thread survives in thechild after a fork().On some systems (e.g. Solaris without posix threads) we find that allactive threads survive in the child after a fork(); this is an error.While BeOS doesn't officially support fork and native threading inthe same application, the present example should work just fine.  DC"""import os, sys, time, threadfrom test_support import verify, verbose, TestSkippedtry:    os.forkexcept AttributeError:    raise TestSkipped, "os.fork not defined -- skipping test_fork1"LONGSLEEP = 2SHORTSLEEP = 0.5NUM_THREADS = 4alive = {}stop = 0def f(id):    while not stop:        alive[id] = os.getpid()        try:            time.sleep(SHORTSLEEP)        except IOError:            passdef main():    for i in range(NUM_THREADS):        thread.start_new(f, (i,))    time.sleep(LONGSLEEP)    a = alive.keys()    a.sort()    verify(a == range(NUM_THREADS))    prefork_lives = alive.copy()    if sys.platform in ['unixware7']:        cpid = os.fork1()    else:        cpid = os.fork()    if cpid == 0:        # Child        time.sleep(LONGSLEEP)        n = 0        for key in alive.keys():            if alive[key] != prefork_lives[key]:                n = n+1        os._exit(n)    else:        # Parent        spid, status = os.waitpid(cpid, 0)        verify(spid == cpid)        verify(status == 0,                "cause = %d, exit = %d" % (status&0xff, status>>8) )        global stop        # Tell threads to die        stop = 1        time.sleep(2*SHORTSLEEP) # Wait for threads to diemain()

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?