test_profilehooks.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 361 行

PY
361
字号
from __future__ import generatorsfrom test_support import TestFailedimport pprintimport sysimport unittestimport test_supportclass HookWatcher:    def __init__(self):        self.frames = []        self.events = []    def callback(self, frame, event, arg):        self.add_event(event, frame)    def add_event(self, event, frame=None):        """Add an event to the log."""        if frame is None:            frame = sys._getframe(1)        try:            frameno = self.frames.index(frame)        except ValueError:            frameno = len(self.frames)            self.frames.append(frame)        self.events.append((frameno, event, ident(frame)))    def get_events(self):        """Remove calls to add_event()."""        disallowed = [ident(self.add_event.im_func), ident(ident)]        self.frames = None        return [item for item in self.events if item[2] not in disallowed]class ProfileSimulator(HookWatcher):    def __init__(self, testcase):        self.testcase = testcase        self.stack = []        HookWatcher.__init__(self)    def callback(self, frame, event, arg):        # Callback registered with sys.setprofile()/sys.settrace()        self.dispatch[event](self, frame)    def trace_call(self, frame):        self.add_event('call', frame)        self.stack.append(frame)    def trace_return(self, frame):        self.add_event('return', frame)        self.stack.pop()    def trace_exception(self, frame):        self.testcase.fail(            "the profiler should never receive exception events")    dispatch = {        'call': trace_call,        'exception': trace_exception,        'return': trace_return,        }class TestCaseBase(unittest.TestCase):    def check_events(self, callable, expected):        events = capture_events(callable, self.new_watcher())        if events != expected:            self.fail("Expected events:\n%s\nReceived events:\n%s"                      % (pprint.pformat(expected), pprint.pformat(events)))class ProfileHookTestCase(TestCaseBase):    def new_watcher(self):        return HookWatcher()    def test_simple(self):        def f(p):            pass        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_exception(self):        def f(p):            1/0        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_caught_exception(self):        def f(p):            try: 1/0            except: pass        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_caught_nested_exception(self):        def f(p):            try: 1/0            except: pass        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_nested_exception(self):        def f(p):            1/0        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              # This isn't what I expected:                              # (0, 'exception', protect_ident),                              # I expected this again:                              (1, 'return', f_ident),                              ])    def test_exception_in_except_clause(self):        def f(p):            1/0        def g(p):            try:                f(p)            except:                try: f(p)                except: pass        f_ident = ident(f)        g_ident = ident(g)        self.check_events(g, [(1, 'call', g_ident),                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              (3, 'call', f_ident),                              (3, 'return', f_ident),                              (1, 'return', g_ident),                              ])    def test_exception_propogation(self):        def f(p):            1/0        def g(p):            try: f(p)            finally: p.add_event("falling through")        f_ident = ident(f)        g_ident = ident(g)        self.check_events(g, [(1, 'call', g_ident),                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              (1, 'falling through', g_ident),                              (1, 'return', g_ident),                              ])    def test_raise_twice(self):        def f(p):            try: 1/0            except: 1/0        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_raise_reraise(self):        def f(p):            try: 1/0            except: raise        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_raise(self):        def f(p):            raise Exception()        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_distant_exception(self):        def f():            1/0        def g():            f()        def h():            g()        def i():            h()        def j(p):            i()        f_ident = ident(f)        g_ident = ident(g)        h_ident = ident(h)        i_ident = ident(i)        j_ident = ident(j)        self.check_events(j, [(1, 'call', j_ident),                              (2, 'call', i_ident),                              (3, 'call', h_ident),                              (4, 'call', g_ident),                              (5, 'call', f_ident),                              (5, 'return', f_ident),                              (4, 'return', g_ident),                              (3, 'return', h_ident),                              (2, 'return', i_ident),                              (1, 'return', j_ident),                              ])    def test_generator(self):        def f():            for i in range(2):                yield i        def g(p):            for i in f():                pass        f_ident = ident(f)        g_ident = ident(g)        self.check_events(g, [(1, 'call', g_ident),                              # call the iterator twice to generate values                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              # once more; returns end-of-iteration with                              # actually raising an exception                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              (1, 'return', g_ident),                              ])    def test_stop_iteration(self):        def f():            for i in range(2):                yield i            raise StopIteration        def g(p):            for i in f():                pass        f_ident = ident(f)        g_ident = ident(g)        self.check_events(g, [(1, 'call', g_ident),                              # call the iterator twice to generate values                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              # once more to hit the raise:                              (2, 'call', f_ident),                              (2, 'return', f_ident),                              (1, 'return', g_ident),                              ])class ProfileSimulatorTestCase(TestCaseBase):    def new_watcher(self):        return ProfileSimulator(self)    def test_simple(self):        def f(p):            pass        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_basic_exception(self):        def f(p):            1/0        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_caught_exception(self):        def f(p):            try: 1/0            except: pass        f_ident = ident(f)        self.check_events(f, [(1, 'call', f_ident),                              (1, 'return', f_ident),                              ])    def test_distant_exception(self):        def f():            1/0        def g():            f()        def h():            g()        def i():            h()        def j(p):            i()        f_ident = ident(f)        g_ident = ident(g)        h_ident = ident(h)        i_ident = ident(i)        j_ident = ident(j)        self.check_events(j, [(1, 'call', j_ident),                              (2, 'call', i_ident),                              (3, 'call', h_ident),                              (4, 'call', g_ident),                              (5, 'call', f_ident),                              (5, 'return', f_ident),                              (4, 'return', g_ident),                              (3, 'return', h_ident),                              (2, 'return', i_ident),                              (1, 'return', j_ident),                              ])def ident(function):    if hasattr(function, "f_code"):        code = function.f_code    else:        code = function.func_code    return code.co_firstlineno, code.co_namedef protect(f, p):    try: f(p)    except: passprotect_ident = ident(protect)def capture_events(callable, p=None):    try: sys.setprofile()    except TypeError: pass    else: raise TestFailed, 'sys.setprofile() did not raise TypeError'    if p is None:        p = HookWatcher()    sys.setprofile(p.callback)    protect(callable, p)    sys.setprofile(None)    return p.get_events()[1:-1]def show_events(callable):    import pprint    pprint.pprint(capture_events(callable))def test_main():    loader = unittest.TestLoader()    suite = unittest.TestSuite()    suite.addTest(loader.loadTestsFromTestCase(ProfileHookTestCase))    suite.addTest(loader.loadTestsFromTestCase(ProfileSimulatorTestCase))    test_support.run_suite(suite)if __name__ == "__main__":    test_main()

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?