main.py
来自「linux subdivision ying gai ke yi le ba」· Python 代码 · 共 620 行 · 第 1/2 页
PY
620 行
fp = open(path, 'a') # open in (a)ppend mode
fp.write(new_text)
fp.close()
# For creating blank new repositories
def create_repos(path):
"""Create a brand-new SVN repository at PATH. If PATH does not yet
exist, create it."""
if not(os.path.exists(path)):
os.makedirs(path) # this creates all the intermediate dirs, if neccessary
stdout, stderr = run_command(svnadmin_binary, 1, 0, "create", path,
"--bdb-txn-nosync", "--fs-type="+fs_type)
# Skip tests if we can't create the repository (most likely, because
# Subversion was built without BDB and this is a default test run).
for line in stderr:
if line.find('Unknown FS type') != -1:
raise Skip
# Allow unauthenticated users to write to the repos, for ra_svn testing.
file_append(os.path.join(path, "conf", "svnserve.conf"),
"[general]\nanon-access = write\n");
# make the repos world-writeable, for mod_dav_svn's sake.
chmod_tree(path, 0666, 0666)
# For copying a repository
def copy_repos(src_path, dst_path, head_revision, ignore_uuid = 0):
"Copy the repository SRC_PATH, with head revision HEAD_REVISION, to DST_PATH"
# A BDB hot-backup procedure would be more efficient, but that would
# require access to the BDB tools, and this doesn't. Print a fake
# pipe command so that the displayed CMDs can be run by hand
create_repos(dst_path)
dump_args = ' dump "' + src_path + '"'
load_args = ' load "' + dst_path + '"'
if ignore_uuid:
load_args = load_args + " --ignore-uuid"
if verbose_mode:
print 'CMD:', os.path.basename(svnadmin_binary) + dump_args, \
'|', os.path.basename(svnadmin_binary) + load_args,
start = time.time()
dump_in, dump_out, dump_err = os.popen3(svnadmin_binary + dump_args, 'b')
load_in, load_out, load_err = os.popen3(svnadmin_binary + load_args, 'b')
stop = time.time()
if verbose_mode:
print '<TIME = %.6f>' % (stop - start)
while 1:
data = dump_out.read(1024*1024) # Arbitrary buffer size
if data == "":
break
load_in.write(data)
load_in.close() # Tell load we are done
dump_lines = dump_err.readlines()
load_lines = load_out.readlines()
dump_in.close()
dump_out.close()
dump_err.close()
load_out.close()
load_err.close()
dump_re = re.compile(r'^\* Dumped revision (\d+)\.$')
expect_revision = 0
for dump_line in dump_lines:
match = dump_re.match(dump_line)
if not match or match.group(1) != str(expect_revision):
print 'ERROR: dump failed:', dump_line,
raise SVNRepositoryCopyFailure
expect_revision += 1
if expect_revision != head_revision + 1:
print 'ERROR: dump failed; did not see revision', head_revision
raise SVNRepositoryCopyFailure
load_re = re.compile(r'^------- Committed revision (\d+) >>>$')
expect_revision = 1
for load_line in load_lines:
match = load_re.match(load_line)
if match:
if match.group(1) != str(expect_revision):
print 'ERROR: load failed:', load_line,
raise SVNRepositoryCopyFailure
expect_revision += 1
if expect_revision != head_revision + 1:
print 'ERROR: load failed; did not see revision', head_revision
raise SVNRepositoryCopyFailure
def set_repos_paths(repo_dir):
"Set current_repo_dir and current_repo_url from a relative path to the repo."
global current_repo_dir, current_repo_url
current_repo_dir = repo_dir
current_repo_url = test_area_url + '/' + repo_dir
if windows == 1:
current_repo_url = string.replace(current_repo_url, '\\', '/')
######################################################################
# Sandbox handling
class Sandbox:
"Manages a sandbox for a test to operate within."
def __init__(self, module, idx):
self.name = '%s-%d' % (module, idx)
self.wc_dir = os.path.join(general_wc_dir, self.name)
self.repo_dir = os.path.join(general_repo_dir, self.name)
self.repo_url = test_area_url + '/' + self.repo_dir
if windows == 1:
self.repo_url = string.replace(self.repo_url, '\\', '/')
self.test_paths = [self.wc_dir, self.repo_dir]
def build(self):
if actions.make_repo_and_wc(self):
raise Failure("Could not build repository and sandbox '%s'" % self.name)
def add_test_path(self, path, remove=1):
self.test_paths.append(path)
if remove:
safe_rmtree(path)
def add_repo_path(self, suffix, remove=1):
path = self.repo_dir + '.' + suffix
url = self.repo_url + '.' + suffix
self.add_test_path(path, remove)
return path, url
def add_wc_path(self, suffix, remove=1):
path = self.wc_dir + '.' + suffix
self.add_test_path(path, remove)
return path
def cleanup_test_paths(self):
for path in self.test_paths:
_cleanup_test_path(path)
_deferred_test_paths = []
def _cleanup_deferred_test_paths():
global _deferred_test_paths
test_paths = _deferred_test_paths[:]
_deferred_test_paths = []
for path in test_paths:
_cleanup_test_path(path, 1)
def _cleanup_test_path(path, retrying=None):
if verbose_mode:
if retrying:
print "CLEANUP: RETRY:", path
else:
print "CLEANUP:", path
try:
safe_rmtree(path)
except:
if verbose_mode:
print "WARNING: cleanup failed, will try again later"
_deferred_test_paths.append(path)
######################################################################
# Main testing functions
# These two functions each take a TEST_LIST as input. The TEST_LIST
# should be a list of test functions; each test function should take
# no arguments and return a 0 on success, non-zero on failure.
# Ideally, each test should also have a short, one-line docstring (so
# it can be displayed by the 'list' command.)
# Func to run one test in the list.
def run_one_test(n, test_list):
"Run the Nth client test in TEST_LIST, return the result."
if (n < 1) or (n > len(test_list) - 1):
print "There is no test", `n` + ".\n"
return 1
# Clear the repos paths for this test
global current_repo_dir, current_repo_url
current_repo_dir = None
current_repo_url = None
tc = testcase.TestCase(test_list[n], n)
func_code = tc.func_code()
if func_code.co_argcount:
# ooh! this function takes a sandbox argument
module, unused = \
os.path.splitext(os.path.basename(func_code.co_filename))
sandbox = Sandbox(module, n)
args = (sandbox,)
else:
sandbox = None
args = ()
# Run the test.
exit_code = tc.run(args)
if sandbox is not None and not exit_code and cleanup_mode:
sandbox.cleanup_test_paths()
reset_config_dir()
return exit_code
def _internal_run_tests(test_list, testnum=None):
exit_code = 0
if testnum is None:
for n in range(1, len(test_list)):
if run_one_test(n, test_list):
exit_code = 1
else:
exit_code = run_one_test(testnum, test_list)
_cleanup_deferred_test_paths()
return exit_code
# Main func. This is the "entry point" that all the test scripts call
# to run their list of tests.
#
# There are three modes for invoking this routine, and they all depend
# on parsing sys.argv[]:
#
# 1. No command-line arguments: all tests in TEST_LIST are run.
# 2. Number 'N' passed on command-line: only test N is run
# 3. String "list" passed on command-line: print each test's docstring.
def run_tests(test_list):
"""Main routine to run all tests in TEST_LIST.
NOTE: this function does not return. It does a sys.exit() with the
appropriate exit code.
"""
global test_area_url
global fs_type
global verbose_mode
global cleanup_mode
testnum = None
# Explicitly set this so that commands that commit but don't supply a
# log message will fail rather than invoke an editor.
os.environ['SVN_EDITOR'] = ''
try:
opts, args = getopt.getopt(sys.argv[1:], 'v',
['url=', 'fs-type=', 'verbose', 'cleanup'])
except getopt.GetoptError:
args = []
for arg in args:
if arg == "list":
print "Test # Mode Test Description"
print "------ ----- ----------------"
n = 1
for x in test_list[1:]:
testcase.TestCase(x, n).list()
n = n+1
# done. just exit with success.
sys.exit(0)
if arg.startswith('BASE_URL='):
test_area_url = arg[9:]
else:
try:
testnum = int(arg)
except ValueError:
pass
for opt, val in opts:
if opt == "--url":
test_area_url = val
elif opt == "--fs-type":
fs_type = val
elif opt == "-v" or opt == "--verbose":
verbose_mode = 1
elif opt == "--cleanup":
cleanup_mode = 1
exit_code = _internal_run_tests(test_list, testnum)
# remove all scratchwork: the 'pristine' repository, greek tree, etc.
# This ensures that an 'import' will happen the next time we run.
safe_rmtree(temp_dir)
_cleanup_deferred_test_paths()
# return the appropriate exit code from the tests.
sys.exit(exit_code)
######################################################################
# Initialization
# Cleanup: if a previous run crashed or interrupted the python
# interpreter, then `temp_dir' was never removed. This can cause wonkiness.
safe_rmtree(temp_dir)
# the modules import each other, so we do this import very late, to ensure
# that the definitions in "main" have been completed.
import actions
### End of file.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?