main.py

来自「linux subdivision ying gai ke yi le ba」· Python 代码 · 共 620 行 · 第 1/2 页

PY
620
字号
  fp = open(path, 'a')  # open in (a)ppend mode
  fp.write(new_text)
  fp.close()

# For creating blank new repositories
def create_repos(path):
  """Create a brand-new SVN repository at PATH.  If PATH does not yet
  exist, create it."""

  if not(os.path.exists(path)):
    os.makedirs(path) # this creates all the intermediate dirs, if neccessary

  stdout, stderr = run_command(svnadmin_binary, 1, 0, "create", path,
                               "--bdb-txn-nosync", "--fs-type="+fs_type)

  # Skip tests if we can't create the repository (most likely, because
  # Subversion was built without BDB and this is a default test run).
  for line in stderr:
    if line.find('Unknown FS type') != -1:
      raise Skip

  # Allow unauthenticated users to write to the repos, for ra_svn testing.
  file_append(os.path.join(path, "conf", "svnserve.conf"),
              "[general]\nanon-access = write\n");

  # make the repos world-writeable, for mod_dav_svn's sake.
  chmod_tree(path, 0666, 0666)

# For copying a repository
def copy_repos(src_path, dst_path, head_revision, ignore_uuid = 0):
  "Copy the repository SRC_PATH, with head revision HEAD_REVISION, to DST_PATH"

  # A BDB hot-backup procedure would be more efficient, but that would
  # require access to the BDB tools, and this doesn't.  Print a fake
  # pipe command so that the displayed CMDs can be run by hand
  create_repos(dst_path)
  dump_args = ' dump "' + src_path + '"'
  load_args = ' load "' + dst_path + '"'

  if ignore_uuid:
    load_args = load_args + " --ignore-uuid"
  if verbose_mode:
    print 'CMD:', os.path.basename(svnadmin_binary) + dump_args, \
          '|', os.path.basename(svnadmin_binary) + load_args,
  start = time.time()
  dump_in, dump_out, dump_err = os.popen3(svnadmin_binary + dump_args, 'b')
  load_in, load_out, load_err = os.popen3(svnadmin_binary + load_args, 'b')
  stop = time.time()
  if verbose_mode:
    print '<TIME = %.6f>' % (stop - start)
  
  while 1:
    data = dump_out.read(1024*1024)  # Arbitrary buffer size
    if data == "":
      break
    load_in.write(data)
  load_in.close() # Tell load we are done

  dump_lines = dump_err.readlines()
  load_lines = load_out.readlines()
  dump_in.close()
  dump_out.close()
  dump_err.close()
  load_out.close()
  load_err.close()

  dump_re = re.compile(r'^\* Dumped revision (\d+)\.$')
  expect_revision = 0
  for dump_line in dump_lines:
    match = dump_re.match(dump_line)
    if not match or match.group(1) != str(expect_revision):
      print 'ERROR:  dump failed:', dump_line,
      raise SVNRepositoryCopyFailure
    expect_revision += 1
  if expect_revision != head_revision + 1:
    print 'ERROR:  dump failed; did not see revision', head_revision
    raise SVNRepositoryCopyFailure

  load_re = re.compile(r'^------- Committed revision (\d+) >>>$')
  expect_revision = 1
  for load_line in load_lines:
    match = load_re.match(load_line)
    if match:
      if match.group(1) != str(expect_revision):
        print 'ERROR:  load failed:', load_line,
        raise SVNRepositoryCopyFailure
      expect_revision += 1
  if expect_revision != head_revision + 1:
    print 'ERROR:  load failed; did not see revision', head_revision
    raise SVNRepositoryCopyFailure


def set_repos_paths(repo_dir):
  "Set current_repo_dir and current_repo_url from a relative path to the repo."
  global current_repo_dir, current_repo_url
  current_repo_dir = repo_dir
  current_repo_url = test_area_url + '/' + repo_dir
  if windows == 1:
    current_repo_url = string.replace(current_repo_url, '\\', '/')


######################################################################
# Sandbox handling

class Sandbox:
  "Manages a sandbox for a test to operate within."

  def __init__(self, module, idx):
    self.name = '%s-%d' % (module, idx)
    self.wc_dir = os.path.join(general_wc_dir, self.name)
    self.repo_dir = os.path.join(general_repo_dir, self.name)
    self.repo_url = test_area_url + '/' + self.repo_dir
    if windows == 1:
      self.repo_url = string.replace(self.repo_url, '\\', '/')
    self.test_paths = [self.wc_dir, self.repo_dir]

  def build(self):
    if actions.make_repo_and_wc(self):
      raise Failure("Could not build repository and sandbox '%s'" % self.name)

  def add_test_path(self, path, remove=1):
    self.test_paths.append(path)
    if remove:
      safe_rmtree(path)

  def add_repo_path(self, suffix, remove=1):
    path = self.repo_dir + '.' + suffix
    url  = self.repo_url + '.' + suffix
    self.add_test_path(path, remove)
    return path, url

  def add_wc_path(self, suffix, remove=1):
    path = self.wc_dir + '.' + suffix
    self.add_test_path(path, remove)
    return path

  def cleanup_test_paths(self):
    for path in self.test_paths:
      _cleanup_test_path(path)


_deferred_test_paths = []
def _cleanup_deferred_test_paths():
  global _deferred_test_paths
  test_paths = _deferred_test_paths[:]
  _deferred_test_paths = []
  for path in test_paths:
    _cleanup_test_path(path, 1)

def _cleanup_test_path(path, retrying=None):
  if verbose_mode:
    if retrying:
      print "CLEANUP: RETRY:", path
    else:
      print "CLEANUP:", path
  try:
    safe_rmtree(path)
  except:
    if verbose_mode:
      print "WARNING: cleanup failed, will try again later"
    _deferred_test_paths.append(path)

######################################################################
# Main testing functions

# These two functions each take a TEST_LIST as input.  The TEST_LIST
# should be a list of test functions; each test function should take
# no arguments and return a 0 on success, non-zero on failure.
# Ideally, each test should also have a short, one-line docstring (so
# it can be displayed by the 'list' command.)

# Func to run one test in the list.
def run_one_test(n, test_list):
  "Run the Nth client test in TEST_LIST, return the result."

  if (n < 1) or (n > len(test_list) - 1):
    print "There is no test", `n` + ".\n"
    return 1

  # Clear the repos paths for this test
  global current_repo_dir, current_repo_url
  current_repo_dir = None
  current_repo_url = None

  tc = testcase.TestCase(test_list[n], n)
  func_code = tc.func_code()
  if func_code.co_argcount:
    # ooh! this function takes a sandbox argument
    module, unused = \
      os.path.splitext(os.path.basename(func_code.co_filename))
    sandbox = Sandbox(module, n)
    args = (sandbox,)
  else:
    sandbox = None
    args = ()

  # Run the test.
  exit_code = tc.run(args)
  if sandbox is not None and not exit_code and cleanup_mode:
    sandbox.cleanup_test_paths()
  reset_config_dir()
  return exit_code

def _internal_run_tests(test_list, testnum=None):
  exit_code = 0

  if testnum is None:
    for n in range(1, len(test_list)):
      if run_one_test(n, test_list):
        exit_code = 1
  else:
    exit_code = run_one_test(testnum, test_list)

  _cleanup_deferred_test_paths()
  return exit_code


# Main func.  This is the "entry point" that all the test scripts call
# to run their list of tests.
#
# There are three modes for invoking this routine, and they all depend
# on parsing sys.argv[]:
#
#   1.  No command-line arguments: all tests in TEST_LIST are run.
#   2.  Number 'N' passed on command-line: only test N is run
#   3.  String "list" passed on command-line:  print each test's docstring.

def run_tests(test_list):
  """Main routine to run all tests in TEST_LIST.

  NOTE: this function does not return. It does a sys.exit() with the
        appropriate exit code.
  """

  global test_area_url
  global fs_type
  global verbose_mode
  global cleanup_mode
  testnum = None

  # Explicitly set this so that commands that commit but don't supply a
  # log message will fail rather than invoke an editor.
  os.environ['SVN_EDITOR'] = ''

  try:
    opts, args = getopt.getopt(sys.argv[1:], 'v',
                               ['url=', 'fs-type=', 'verbose', 'cleanup'])
  except getopt.GetoptError:
    args = []

  for arg in args:
    if arg == "list":
      print "Test #  Mode   Test Description"
      print "------  -----  ----------------"
      n = 1
      for x in test_list[1:]:
        testcase.TestCase(x, n).list()
        n = n+1

      # done. just exit with success.
      sys.exit(0)

    if arg.startswith('BASE_URL='):
      test_area_url = arg[9:]
    else:
      try:
        testnum = int(arg)
      except ValueError:
        pass

  for opt, val in opts:
    if opt == "--url":
      test_area_url = val

    elif opt == "--fs-type":
      fs_type = val

    elif opt == "-v" or opt == "--verbose":
      verbose_mode = 1

    elif opt == "--cleanup":
      cleanup_mode = 1

  exit_code = _internal_run_tests(test_list, testnum)

  # remove all scratchwork: the 'pristine' repository, greek tree, etc.
  # This ensures that an 'import' will happen the next time we run.
  safe_rmtree(temp_dir)

  _cleanup_deferred_test_paths()

  # return the appropriate exit code from the tests.
  sys.exit(exit_code)


######################################################################
# Initialization

# Cleanup: if a previous run crashed or interrupted the python
# interpreter, then `temp_dir' was never removed.  This can cause wonkiness.

safe_rmtree(temp_dir)

# the modules import each other, so we do this import very late, to ensure
# that the definitions in "main" have been completed.
import actions


### End of file.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?