⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 comfychair.py

📁 distcc编译器的源代码.好像是readhat公司开发的.
💻 PY
字号:
#! /usr/bin/env python# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org># Copyright (C) 2003 by Tim Potter <tpot@samba.org># # This program is free software; you can redistribute it and/or# modify it under the terms of the GNU General Public License as# published by the Free Software Foundation; either version 2 of the# License, or (at your option) any later version.# # This program is distributed in the hope that it will be useful, but# WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU# General Public License for more details.# # You should have received a copy of the GNU General Public License# along with this program; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307# USA"""comfychair: a Python-based instrument of software torture.Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>Copyright (C) 2003 by Tim Potter <tpot@samba.org>This is a test framework designed for testing programs written inPython, or (through a fork/exec interface) any other language.For more information, see the file README.comfychair.To run a test suite based on ComfyChair, just run it as a program."""import sys, reclass TestCase:    """A base class for tests.  This class defines required functions which    can optionally be overridden by subclasses.  It also provides some    utility functions for"""    def __init__(self):        self.test_log = ""        self.background_pids = []        self._cleanups = []        self._enter_rundir()        self._save_environment()        self.add_cleanup(self.teardown)    # --------------------------------------------------    # Save and restore directory    def _enter_rundir(self):        import os        self.basedir = os.getcwd()        self.add_cleanup(self._restore_directory)        self.rundir = os.path.join(self.basedir,                                   'testtmp',                                    self.__class__.__name__)        self.tmpdir = os.path.join(self.rundir, 'tmp')        os.system("rm -fr %s" % self.rundir)        os.makedirs(self.tmpdir)        os.system("mkdir -p %s" % self.rundir)        os.chdir(self.rundir)    def _restore_directory(self):        import os        os.chdir(self.basedir)    # --------------------------------------------------    # Save and restore environment    def _save_environment(self):        import os        self._saved_environ = os.environ.copy()        self.add_cleanup(self._restore_environment)    def _restore_environment(self):        import os        os.environ.clear()        os.environ.update(self._saved_environ)        def setup(self):        """Set up test fixture."""        pass    def teardown(self):        """Tear down test fixture."""        pass    def runtest(self):        """Run the test."""        pass    def add_cleanup(self, c):        """Queue a cleanup to be run when the test is complete."""        self._cleanups.append(c)            def fail(self, reason = ""):        """Say the test failed."""        raise AssertionError(reason)    #############################################################    # Requisition methods    def require(self, predicate, message):        """Check a predicate for running this test.If the predicate value is not true, the test is skipped with a message explainingwhy."""        if not predicate:            raise NotRunError, message    def require_root(self):        """Skip this test unless run by root."""        import os        self.require(os.getuid() == 0,                     "must be root to run this test")    #############################################################    # Assertion methods    def assert_(self, expr, reason = ""):        if not expr:            raise AssertionError(reason)    def assert_equal(self, a, b):        if not a == b:            raise AssertionError("assertEquals failed: %s" % `(a, b)`)                def assert_notequal(self, a, b):        if a == b:            raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)    def assert_re_match(self, pattern, s):        """Assert that a string matches a particular pattern        Inputs:          pattern      string: regular expression          s            string: to be matched        Raises:          AssertionError if not matched          """        if not re.match(pattern, s):            raise AssertionError("string does not match regexp\n"                                 "    string: %s\n"                                 "    re: %s" % (`s`, `pattern`))    def assert_re_search(self, pattern, s):        """Assert that a string *contains* a particular pattern        Inputs:          pattern      string: regular expression          s            string: to be searched        Raises:          AssertionError if not matched          """        if not re.search(pattern, s):            raise AssertionError("string does not contain regexp\n"                                 "    string: %s\n"                                 "    re: %s" % (`s`, `pattern`))    def assert_no_file(self, filename):        import os.path        assert not os.path.exists(filename), ("file exists but should not: %s" % filename)    #############################################################    # Methods for running programs    def runcmd_background(self, cmd):        import os        self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"        pid = os.fork()        if pid == 0:            # child            try:                os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])            finally:                os._exit(127)        self.test_log = self.test_log + "pid: %d\n" % pid        return pid    def runcmd(self, cmd, expectedResult = 0):        """Run a command, fail if the command returns an unexpected exit        code.  Return the output produced."""        rc, output, stderr = self.runcmd_unchecked(cmd)        if rc != expectedResult:            raise AssertionError("""command returned %d; expected %s: \"%s\"stdout:%sstderr:%s""" % (rc, expectedResult, cmd, output, stderr))        return output, stderr    def run_captured(self, cmd):        """Run a command, capturing stdout and stderr.        Based in part on popen2.py        Returns (waitstatus, stdout, stderr)."""        import os, types        pid = os.fork()        if pid == 0:            # child            try:                 pid = os.getpid()                openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC                outfd = os.open('%d.out' % pid, openmode, 0666)                os.dup2(outfd, 1)                os.close(outfd)                errfd = os.open('%d.err' % pid, openmode, 0666)                os.dup2(errfd, 2)                os.close(errfd)                if isinstance(cmd, types.StringType):                    cmd = ['/bin/sh', '-c', cmd]                os.execvp(cmd[0], cmd)            finally:                os._exit(127)        else:            # parent            exited_pid, waitstatus = os.waitpid(pid, 0)            stdout = open('%d.out' % pid).read()            stderr = open('%d.err' % pid).read()            return waitstatus, stdout, stderr    def runcmd_unchecked(self, cmd, skip_on_noexec = 0):        """Invoke a command; return (exitcode, stdout, stderr)"""        import os        waitstatus, stdout, stderr = self.run_captured(cmd)        assert not os.WIFSIGNALED(waitstatus), \               ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))        rc = os.WEXITSTATUS(waitstatus)        self.test_log = self.test_log + ("""Run command: %sWait status: %#x (exit code %d, signal %d)stdout:%sstderr:%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),         stdout, stderr))        if skip_on_noexec and rc == 127:            # Either we could not execute the command or the command            # returned exit code 127.  According to system(3) we can't            # tell the difference.            raise NotRunError, "could not execute %s" % `cmd`        return rc, stdout, stderr        def explain_failure(self, exc_info = None):        print "test_log:"        print self.test_log    def log(self, msg):        """Log a message to the test log.  This message is displayed if        the test fails, or when the runtests function is invoked with        the verbose option."""        self.test_log = self.test_log + msg + "\n"class NotRunError(Exception):    """Raised if a test must be skipped because of missing resources"""    def __init__(self, value = None):        self.value = valuedef _report_error(case, debugger):    """Ask the test case to explain failure, and optionally run a debugger    Input:      case         TestCase instance      debugger     if true, a debugger function to be applied to the traceback"""    import sys    ex = sys.exc_info()    print "-----------------------------------------------------------------"    if ex:        import traceback        traceback.print_exc(file=sys.stdout)    case.explain_failure()    print "-----------------------------------------------------------------"    if debugger:        tb = ex[2]        debugger(tb)def runtests(test_list, verbose = 0, debugger = None):    """Run a series of tests.    Inputs:      test_list    sequence of TestCase classes      verbose      print more information as testing proceeds      debugger     debugger object to be applied to errors    Returns:      unix return code: 0 for success, 1 for failures, 2 for test failure    """    import traceback    ret = 0    for test_class in test_list:        print "%-30s" % _test_name(test_class),        # flush now so that long running tests are easier to follow        sys.stdout.flush()        obj = None        try:            try: # run test and show result                obj = test_class()                obj.setup()                obj.runtest()                print "OK"            except KeyboardInterrupt:                print "INTERRUPT"                _report_error(obj, debugger)                ret = 2                break            except NotRunError, msg:                print "NOTRUN, %s" % msg.value            except:                print "FAIL"                _report_error(obj, debugger)                ret = 1        finally:            while obj and obj._cleanups:                try:                    apply(obj._cleanups.pop())                except KeyboardInterrupt:                    print "interrupted during teardown"                    _report_error(obj, debugger)                    ret = 2                    break                except:                    print "error during teardown"                    _report_error(obj, debugger)                    ret = 1        # Display log file if we're verbose        if ret == 0 and verbose:            obj.explain_failure()                return retdef _test_name(test_class):    """Return a human-readable name for a test class.    """    try:        return test_class.__name__    except:        return `test_class`def print_help():    """Help for people running tests"""    import sys    print """%s: software test suite based on ComfyChairusage:    To run all tests, just run this program.  To run particular tests,    list them on the command line.options:    --help              show usage message    --list              list available tests    --verbose, -v       show more information while running tests    --post-mortem, -p   enter Python debugger on error""" % sys.argv[0]def print_list(test_list):    """Show list of available tests"""    for test_class in test_list:        print "    %s" % _test_name(test_class)def main(tests, extra_tests=[]):    """Main entry point for test suites based on ComfyChair.    inputs:      tests       Sequence of TestCase subclasses to be run by default.      extra_tests Sequence of TestCase subclasses that are available but                  not run by default.Test suites should contain this boilerplate:    if __name__ == '__main__':        comfychair.main(tests)This function handles standard options such as --help and --list, andby default runs all tests in the suggested order.Calls sys.exit() on completion."""    from sys import argv    import getopt, sys    opt_verbose = 0    debugger = None    opts, args = getopt.getopt(argv[1:], 'pv',                               ['help', 'list', 'verbose', 'post-mortem'])    for opt, opt_arg in opts:        if opt == '--help':            print_help()            return        elif opt == '--list':            print_list(tests + extra_tests)            return        elif opt == '--verbose' or opt == '-v':            opt_verbose = 1        elif opt == '--post-mortem' or opt == '-p':            import pdb            debugger = pdb.post_mortem    if args:        all_tests = tests + extra_tests        by_name = {}        for t in all_tests:            by_name[_test_name(t)] = t        which_tests = []        for name in args:            which_tests.append(by_name[name])    else:        which_tests = tests    sys.exit(runtests(which_tests, verbose=opt_verbose,                      debugger=debugger))if __name__ == '__main__':    print __doc__

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -