📄 dirops.py
字号:
#!/usr/bin/env python## $Id: dirops.py 182 2008-10-04 00:11:28Z sriramsrao $## Created 2008/07/28## Copyright 2008 Quantcast Corporation.## Author: Sriram Rao (Quantcast Corp.)## This file is part of Kosmos File System (KFS).## Licensed under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or# implied. See the License for the specific language governing# permissions and limitations under the License.## This is a unittest for directory operations:# -- create a directory hierarchy; when we read the contents of a# directory, all the entries in the tree that we created should show# up.#import kfsimport sysimport os, os.pathimport mmapimport ConfigParserimport unittestimport timeTESTNAME = "dirops"default_test_params = { "kfs_properties": "KfsClient.prp", # KFS property file "log_file": TESTNAME + ".log", # log of operations "test_files": "20", # no. of files to use "max_file_size": "100000", # maximum file size "max_rw_size": "4096", # max. read or write length "source_file": "/usr/share/dict/words"} # source for writesdefault_config_file = TESTNAME + ".cfg" # test configuration fileparam_section = "Test parameters" # section heading in fileconfig = ConfigParser.ConfigParser(default_test_params)kfsClient = Nonedef setup_params(config_file): """Read in the configuration file""" if config_file: config.read(config_file) else: config.add_section(param_section)def get_param_string(name): """Look up a string parameter""" return config.get(param_section, name)def specialDirEntry(entry): return entry == '.' or entry == '..'def start_client(props): """ Create an instance of the KFS client. The KFS meta/chunkservers must already be running.""" try: global kfsClient kfsClient = kfs.client(props) except: print "Unable to start the KFS client." sys.exit(1)class DirOpsTestCase(unittest.TestCase): def setUp(self): global kfsClient self.kfsClient = kfsClient # make the directory hierarchy if not self.kfsClient.isdir(TESTNAME): res = self.kfsClient.mkdir(TESTNAME) def tearDown(self): if self.kfsClient.isdir(TESTNAME): self.kfsClient.rmdirs(TESTNAME) def testReaddir(self): """Testing the readdir API""" entries = self.kfsClient.readdir(TESTNAME) assert len(entries) == 2 entries = [e for e in entries if not specialDirEntry(e)] assert len(entries) == 0 def testReaddirPlus(self): """Testing the readdirplus API""" entries = self.kfsClient.readdirplus(TESTNAME) assert len(entries) == 2 entries = [e[0] for e in entries if not specialDirEntry(e[0])] assert len(entries) == 0if __name__ == '__main__': setup_params(None) client = start_client(get_param_string('kfs_properties')) unittest.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -