⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 main.py

📁 subversion-1.4.3-1.tar.gz 配置svn的源码
💻 PY
📖 第 1 页 / 共 2 页
字号:
  chmod_tree(path, 0666, 0666)# For copying a repositorydef copy_repos(src_path, dst_path, head_revision, ignore_uuid = 0):  "Copy the repository SRC_PATH, with head revision HEAD_REVISION, to DST_PATH"  # A BDB hot-backup procedure would be more efficient, but that would  # require access to the BDB tools, and this doesn't.  Print a fake  # pipe command so that the displayed CMDs can be run by hand  create_repos(dst_path)  dump_args = ' dump "' + src_path + '"'  load_args = ' load "' + dst_path + '"'  if ignore_uuid:    load_args = load_args + " --ignore-uuid"  if verbose_mode:    print 'CMD:', os.path.basename(svnadmin_binary) + dump_args, \          '|', os.path.basename(svnadmin_binary) + load_args,  start = time.time()  dump_in, dump_out, dump_err = os.popen3(svnadmin_binary + dump_args, 'b')  load_in, load_out, load_err = os.popen3(svnadmin_binary + load_args, 'b')  stop = time.time()  if verbose_mode:    print '<TIME = %.6f>' % (stop - start)  while 1:    data = dump_out.read(1024*1024)  # Arbitrary buffer size    if data == "":      break    load_in.write(data)  load_in.close() # Tell load we are done  dump_lines = dump_err.readlines()  load_lines = load_out.readlines()  dump_in.close()  dump_out.close()  dump_err.close()  load_out.close()  load_err.close()  dump_re = re.compile(r'^\* Dumped revision (\d+)\.\r?$')  expect_revision = 0  for dump_line in dump_lines:    match = dump_re.match(dump_line)    if not match or match.group(1) != str(expect_revision):      print 'ERROR:  dump failed:', dump_line,      raise SVNRepositoryCopyFailure    expect_revision += 1  if expect_revision != head_revision + 1:    print 'ERROR:  dump failed; did not see revision', head_revision    raise SVNRepositoryCopyFailure  load_re = re.compile(r'^------- Committed revision (\d+) >>>\r?$')  expect_revision = 1  for load_line in load_lines:    match = load_re.match(load_line)    if match:      if match.group(1) != str(expect_revision):        print 'ERROR:  load failed:', load_line,        raise SVNRepositoryCopyFailure      expect_revision += 1  if expect_revision != head_revision + 1:    print 'ERROR:  load failed; did not see revision', head_revision    raise SVNRepositoryCopyFailuredef set_repos_paths(repo_dir):  "Set current_repo_dir and current_repo_url from a relative path to the repo."  global current_repo_dir, current_repo_url  current_repo_dir = repo_dir  current_repo_url = test_area_url + '/' + repo_dir  if windows == 1:    current_repo_url = string.replace(current_repo_url, '\\', '/')def canonicalize_url(input):  "Canonicalize the url, if the scheme is unknown, returns intact input"  m = re.match(r"^((file://)|((svn|svn\+ssh|http|https)(://)))", input)  if m:    scheme = m.group(1)    return scheme + re.sub(r'//*', '/', input[len(scheme):])  else:    return inputdef create_python_hook_script (hook_path, hook_script_code):  """Create a Python hook script at HOOK_PATH with the specified     HOOK_SCRIPT_CODE."""  if sys.platform == 'win32':    # Use an absolute path since the working directory is not guaranteed    hook_path = os.path.abspath(hook_path)    # Fill the python file.    file_append ("%s.py" % hook_path, hook_script_code)    # Fill the batch wrapper file.    file_append ("%s.bat" % hook_path,                 "@\"%s\" %s.py\n" % (sys.executable, hook_path))  else:    # For all other platforms    file_append (hook_path, "#!%s\n%s" % (sys.executable, hook_script_code))    os.chmod (hook_path, 0755)def compare_unordered_output(expected, actual):  """Compare lists of output lines for equality disregarding the     order of the lines"""  if len(actual) != len(expected):    raise Failure("Length of expected output not equal to actual length")  for aline in actual:    try:      i = expected.index(aline)      expected.pop(i)    except ValueError:      raise Failure("Expected output does not match actual output")####################################################################### Sandbox handlingclass Sandbox:  """Manages a sandbox (one or more repository/working copy pairs) for  a test to operate within."""  dependents = None  def __init__(self, module, idx):    self._set_name("%s-%d" % (module, idx))  def _set_name(self, name):    """A convenience method for renaming a sandbox, useful when    working with multiple repositories in the same unit test."""    self.name = name    self.wc_dir = os.path.join(general_wc_dir, self.name)    self.repo_dir = os.path.join(general_repo_dir, self.name)    self.repo_url = test_area_url + '/' + self.repo_dir    # For dav tests we need a single authz file which must be present,    # so we recreate it each time a sandbox is created with some default    # contents.    if self.repo_url.startswith("http"):      # this dir doesn't exist out of the box, so we may have to make it      if not(os.path.exists(work_dir)):        os.makedirs(work_dir)      self.authz_file = os.path.join(work_dir, "authz")      fp = open(self.authz_file, "w")      fp.write("[/]\n* = rw\n")      fp.close()    # For svnserve tests we have a per-repository authz file, and it    # doesn't need to be there in order for things to work, so we don't    # have any default contents.    elif self.repo_url.startswith("svn"):      self.authz_file = os.path.join(self.repo_dir, "conf", "authz")    if windows == 1:      self.repo_url = string.replace(self.repo_url, '\\', '/')    self.test_paths = [self.wc_dir, self.repo_dir]  def clone_dependent(self):    """A convenience method for creating a near-duplicate of this    sandbox, useful when working with multiple repositories in the    same unit test.  Any necessary cleanup operations are triggered    by cleanup of the original sandbox."""    if not self.dependents:      self.dependents = []    self.dependents.append(copy.deepcopy(self))    self.dependents[-1]._set_name("%s-%d" % (self.name, len(self.dependents)))    return self.dependents[-1]  def build(self, name = None, create_wc = True):    if name != None:      self._set_name(name)    if actions.make_repo_and_wc(self, create_wc):      raise Failure("Could not build repository and sandbox '%s'" % self.name)  def add_test_path(self, path, remove=1):    self.test_paths.append(path)    if remove:      safe_rmtree(path)  def add_repo_path(self, suffix, remove=1):    path = self.repo_dir + '.' + suffix    url  = self.repo_url + '.' + suffix    self.add_test_path(path, remove)    return path, url  def add_wc_path(self, suffix, remove=1):    path = self.wc_dir + '.' + suffix    self.add_test_path(path, remove)    return path  def cleanup_test_paths(self):    "Clean up detritus from this sandbox, and any dependents."    if self.dependents:      # Recursively cleanup any dependent sandboxes.      for sbox in self.dependents:        sbox.cleanup_test_paths()    for path in self.test_paths:      _cleanup_test_path(path)_deferred_test_paths = []def _cleanup_deferred_test_paths():  global _deferred_test_paths  test_paths = _deferred_test_paths[:]  _deferred_test_paths = []  for path in test_paths:    _cleanup_test_path(path, 1)def _cleanup_test_path(path, retrying=None):  if verbose_mode:    if retrying:      print "CLEANUP: RETRY:", path    else:      print "CLEANUP:", path  try:    safe_rmtree(path)  except:    if verbose_mode:      print "WARNING: cleanup failed, will try again later"    _deferred_test_paths.append(path)class TestRunner:  """Encapsulate a single test case (predicate), including logic for  runing the test and test list output."""  def __init__(self, func, index):    self.pred = testcase.create_test_case(func)    self.index = index  def list(self):    print " %2d     %-5s  %s" % (self.index,                                 self.pred.list_mode(),                                 self.pred.get_description())    self.pred.check_description()  def _print_name(self):    print os.path.basename(sys.argv[0]), str(self.index) + ":", \          self.pred.get_description()    self.pred.check_description()  def run(self):    """Run self.pred and return the result.  The return value is        - 0 if the test was successful        - 1 if it errored in a way that indicates test failure        - 2 if the test skipped        """    if self.pred.need_sandbox():      # ooh! this function takes a sandbox argument      sandbox = Sandbox(self.pred.get_sandbox_name(), self.index)      args = (sandbox,)    else:      sandbox = None      args = ()    try:      rc = self.pred.run(args)      if rc is not None:        print 'STYLE ERROR in',        self._print_name()        print 'Test driver returned a status code.'        sys.exit(255)      result = 0    except Skip, ex:      result = 2    except Failure, ex:      result = 1      # We captured Failure and its subclasses. We don't want to print      # anything for plain old Failure since that just indicates test      # failure, rather than relevant information. However, if there      # *is* information in the exception's arguments, then print it.      if ex.__class__ != Failure or ex.args:        ex_args = str(ex)        if ex_args:          print 'EXCEPTION: %s: %s' % (ex.__class__.__name__, ex_args)        else:          print 'EXCEPTION:', ex.__class__.__name__    except KeyboardInterrupt:      print 'Interrupted'      sys.exit(0)    except SystemExit, ex:      print 'EXCEPTION: SystemExit(%d), skipping cleanup' % ex.code      print ex.code and 'FAIL: ' or 'PASS: ',      self._print_name()      raise    except:      result = 1      print 'UNEXPECTED EXCEPTION:'      traceback.print_exc(file=sys.stdout)    result = self.pred.convert_result(result)    print self.pred.run_text(result),    self._print_name()    sys.stdout.flush()    if sandbox is not None and result != 1 and cleanup_mode:      sandbox.cleanup_test_paths()    return result####################################################################### Main testing functions# These two functions each take a TEST_LIST as input.  The TEST_LIST# should be a list of test functions; each test function should take# no arguments and return a 0 on success, non-zero on failure.# Ideally, each test should also have a short, one-line docstring (so# it can be displayed by the 'list' command.)# Func to run one test in the list.def run_one_test(n, test_list):  "Run the Nth client test in TEST_LIST, return the result."  if (n < 1) or (n > len(test_list) - 1):    print "There is no test", `n` + ".\n"    return 1  # Clear the repos paths for this test  global current_repo_dir, current_repo_url  current_repo_dir = None  current_repo_url = None  # Run the test.  exit_code = TestRunner(test_list[n], n).run()  reset_config_dir()  return exit_codedef _internal_run_tests(test_list, testnums):  """Run the tests from TEST_LIST whose indices are listed in TESTNUMS."""  exit_code = 0  for testnum in testnums:    # 1 is the only return code that indicates actual test failure.    if run_one_test(testnum, test_list) == 1:      exit_code = 1  _cleanup_deferred_test_paths()  return exit_code# Main func.  This is the "entry point" that all the test scripts call# to run their list of tests.## This routine parses sys.argv to decide what to do.  Basic usage:## test-script.py [--list] [<testnum>]...## --list : Option to print the docstrings for the chosen tests# instead of running them.## [<testnum>]... : the numbers of the tests that should be run.  If no# testnums are specified, then all tests in TEST_LIST are run.def run_tests(test_list):  """Main routine to run all tests in TEST_LIST.  NOTE: this function does not return. It does a sys.exit() with the        appropriate exit code.  """  global test_area_url  global fs_type  global verbose_mode  global cleanup_mode  testnums = []  # Should the tests be listed (as opposed to executed)?  list_tests = 0  # Explicitly set this so that commands that commit but don't supply a  # log message will fail rather than invoke an editor.  os.environ['SVN_EDITOR'] = ''  opts, args = my_getopt(sys.argv[1:], 'v',                         ['url=', 'fs-type=', 'verbose', 'cleanup', 'list'])  for arg in args:    if arg == "list":      # This is an old deprecated variant of the "--list" option:      list_tests = 1    elif arg.startswith('BASE_URL='):      test_area_url = arg[9:]    else:      testnums.append(int(arg))  for opt, val in opts:    if opt == "--url":      test_area_url = val    elif opt == "--fs-type":      fs_type = val    elif opt == "-v" or opt == "--verbose":      verbose_mode = 1    elif opt == "--cleanup":      cleanup_mode = 1    elif opt == "--list":      list_tests = 1  if test_area_url[-1:] == '/': # Normalize url to have no trailing slash    test_area_url = test_area_url[:-1]  if not testnums:    # If no test numbers were listed explicitly, include all of them:    testnums = range(1, len(test_list))  if list_tests:    print "Test #  Mode   Test Description"    print "------  -----  ----------------"    for testnum in testnums:      TestRunner(test_list[testnum], testnum).list()    # done. just exit with success.    sys.exit(0)  else:    exit_code = _internal_run_tests(test_list, testnums)    # remove all scratchwork: the 'pristine' repository, greek tree, etc.    # This ensures that an 'import' will happen the next time we run.    safe_rmtree(temp_dir)    _cleanup_deferred_test_paths()    # return the appropriate exit code from the tests.    sys.exit(exit_code)####################################################################### Initialization# Cleanup: if a previous run crashed or interrupted the python# interpreter, then `temp_dir' was never removed.  This can cause wonkiness.safe_rmtree(temp_dir)# the modules import each other, so we do this import very late, to ensure# that the definitions in "main" have been completed.import actions### End of file.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -