📄 xdelta3-regtest.py
字号:
#!/usr/bin/python2.5# xdelta 3 - delta compression tools and library# Copyright (C) 2003, 2006, 2007, 2008. Joshua P. MacDonald## This program is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2 of the License, or# (at your option) any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA# TODO: test 1.5 vs. greedyimport os, sys, math, re, time, types, array, randomimport xdelta3#RCSDIR = '/mnt/polaroid/Polaroid/orbit_linux/home/jmacd/PRCS'#RCSDIR = '/tmp/PRCS_read_copy'#SAMPLEDIR = "/tmp/WESNOTH_tmp/diff"#RCSDIR = 'G:/jmacd/PRCS_copy'#SAMPLEDIR = "C:/sample_data/Wesnoth/tar"#RCSDIR = '/Users/jmacd/src/ftp.kernel.org/pub/scm/linux/kernel/bkcvs/linux-2.4/net/x25'RCSDIR = '/Users/jmacd/src/ftp.kernel.org'#MIN_SIZE = 0TIME_TOO_SHORT = 0.050SKIP_TRIALS = 2MIN_TRIALS = 3MAX_TRIALS = 15# 10 = fast 1.5 = slowMIN_STDDEV_PCT = 1.5# How many results per roundMAX_RESULTS = 500TEST_ROUNDS = 500KEEP_P = (0.5)# For RCS testing, what percent to selectFILE_P = (0.50)# For run-speed testsMIN_RUN = 1000 * 1000 * 1MAX_RUN = 1000 * 1000 * 10# Testwide defaultsALL_ARGS = [ '-vv' ]# The first 7 args go to -CSOFT_CONFIG_CNT = 7CONFIG_ORDER = [ 'large_look', 'large_step', 'small_look', 'small_chain', 'small_lchain', 'max_lazy', 'long_enough', # > SOFT_CONFIG_CNT 'nocompress', 'winsize', 'srcwinsize', 'sprevsz', 'iopt', 'djw', 'altcode', ]CONFIG_ARGMAP = { 'winsize' : '-W', 'srcwinsize' : '-B', 'sprevsz' : '-P', 'iopt' : '-I', 'nocompress' : '-N', 'djw' : '-Sdjw', 'altcode' : '-T', }def INPUT_SPEC(rand): return { # Time/space costs: # -C 1,2,3,4,5,6,7 'large_look' : lambda d: rand.choice([9, 10, 11, 12]), 'large_step' : lambda d: rand.choice([25, 26, 27, 28, 29, 30]), 'small_look' : lambda d: rand.choice([4]), 'small_chain' : lambda d: rand.choice([1]), 'small_lchain' : lambda d: rand.choice([1]), 'max_lazy' : lambda d: rand.choice([4, 5, 6, 7, 8, 9, 10 ]), # Note: long_enough only refers to small matching and has no effect if # small_chain == 1. 'long_enough' : lambda d: rand.choice([4]), # -N 'nocompress' : lambda d: rand.choice(['false']), # -T 'altcode' : lambda d: rand.choice(['false']), # -S djw 'djw' : lambda d: rand.choice(['false']), # Memory costs: # -W 'winsize' : lambda d: 8 * (1<<20), # -B 'srcwinsize' : lambda d: 64 * (1<<20), # -I 0 is unlimited 'iopt' : lambda d: 0, # -P only powers of two 'sprevsz' : lambda d: rand.choice([x * (1<<16) for x in [4]]), }#end#TMPDIR = '/tmp/xd3regtest.%d' % os.getpid()RUNFILE = os.path.join(TMPDIR, 'run')DFILE = os.path.join(TMPDIR, 'output')RFILE = os.path.join(TMPDIR, 'recon')HEAD_STATE = 0BAR_STATE = 1REV_STATE = 2DATE_STATE = 3#IGNORE_FILENAME = re.compile('.*\\.(gif|jpg).*')# rcs outputRE_TOTREV = re.compile('total revisions: (\\d+)')RE_BAR = re.compile('----------------------------')RE_REV = re.compile('revision (.+)')RE_DATE = re.compile('date: ([^;]+);.*')# xdelta outputRE_HDRSZ = re.compile('VCDIFF header size: +(\\d+)')RE_EXTCOMP = re.compile('XDELTA ext comp.*')def c2str(c): return ' '.join(['%s' % x for x in c])#enddef SumList(l): return reduce(lambda x,y: x+y, l)#end# returns (total, mean, stddev, q2 (median),# (q3-q1)/2 ("semi-interquartile range"), max-min (spread))class StatList: def __init__(self,l,desc): cnt = len(l) assert(cnt > 1) l.sort() self.cnt = cnt self.l = l self.total = SumList(l) self.mean = self.total / float(self.cnt) self.s = math.sqrt(SumList([(x-self.mean) * (x - self.mean) for x in l]) / float(self.cnt-1)) self.q0 = l[0] self.q1 = l[int(self.cnt/4.0+0.5)] self.q2 = l[int(self.cnt/2.0+0.5)] self.q3 = l[min(self.cnt-1,int((3.0*self.cnt)/4.0+0.5))] self.q4 = l[self.cnt-1]+1 self.siqr = (self.q3-self.q1)/2.0; self.spread = (self.q4-self.q0) self.str = '%s %d; mean %d; sdev %d; q2 %d; .5(q3-q1) %.1f; spread %d' % \ (desc, self.total, self.mean, self.s, self.q2, self.siqr, self.spread) #end#enddef RunCommand(args, ok = [0]): #print 'run command %s' % (' '.join(args)) p = os.spawnvp(os.P_WAIT, args[0], args) if p not in ok: raise CommandError(args, 'exited %d' % p) #end#enddef RunCommandIO(args,infn,outfn): p = os.fork() if p == 0: os.dup2(os.open(infn,os.O_RDONLY),0) os.dup2(os.open(outfn,os.O_CREAT|os.O_TRUNC|os.O_WRONLY),1) os.execvp(args[0], args) else: s = os.waitpid(p,0) o = os.WEXITSTATUS(s[1]) if not os.WIFEXITED(s[1]) or o != 0: raise CommandError(args, 'exited %d' % o) #end #end#endclass TimedTest: def __init__(self, target, source, runnable, skip_trials = SKIP_TRIALS, min_trials = MIN_TRIALS, max_trials = MAX_TRIALS, min_stddev_pct = MIN_STDDEV_PCT): self.target = target self.source = source self.runnable = runnable self.skip_trials = skip_trials self.min_trials = min(min_trials, max_trials) self.max_trials = max_trials self.min_stddev_pct = min_stddev_pct self.encode_time = self.DoTest(DFILE, lambda x: x.Encode(self.target, self.source, DFILE)) self.encode_size = runnable.EncodeSize(DFILE) self.decode_time = self.DoTest(RFILE, lambda x: x.Decode(DFILE, self.source, RFILE), ) # verify runnable.Verify(self.target, RFILE) #end def DoTest(self, fname, func): trials = 0 measured = [] while 1: try: os.remove(fname) except OSError: pass start_time = time.time() start_clock = time.clock() func(self.runnable) total_clock = (time.clock() - start_clock) total_time = (time.time() - start_time) elap_time = max(total_time, 0.0000001) elap_clock = max(total_clock, 0.0000001) trials = trials + 1 # skip some of the first trials if trials > self.skip_trials: measured.append((elap_clock, elap_time)) #print 'measurement total: %.1f ms' % (total_time * 1000.0) # at least so many if trials < (self.skip_trials + self.min_trials): #print 'continue: need more trials: %d' % trials continue # compute %variance done = 0 if self.skip_trials + self.min_trials <= 2: measured = measured + measured; done = 1 #end time_stat = StatList([x[1] for x in measured], 'elap time') sp = float(time_stat.s) / float(time_stat.mean) # what if MAX_TRIALS is exceeded? too_many = (trials - self.skip_trials) >= self.max_trials good = (100.0 * sp) < self.min_stddev_pct if done or too_many or good: trials = trials - self.skip_trials if not done and not good: #print 'too many trials: %d' % trials pass #clock = StatList([x[0] for x in measured], 'elap clock') return time_stat #end #end #end#enddef Decimals(start, end): l = [] step = start while 1: r = range(step, step * 10, step) l = l + r if step * 10 >= end: l.append(step * 10) break step = step * 10 return l#end# This tests the raw speed of 0-byte inputsdef RunSpeedTest(): for L in Decimals(MIN_RUN, MAX_RUN): SetFileSize(RUNFILE, L) trx = TimedTest(RUNFILE, None, Xdelta3Runner(['-W', str(1<<20)])) ReportSpeed(L, trx, '1MB ') trx = TimedTest(RUNFILE, None, Xdelta3Runner(['-W', str(1<<19)])) ReportSpeed(L, trx, '512k') trx = TimedTest(RUNFILE, None, Xdelta3Runner(['-W', str(1<<18)])) ReportSpeed(L, trx, '256k') trm = TimedTest(RUNFILE, None, Xdelta3Mod1(RUNFILE)) ReportSpeed(L, trm, 'swig') trg = TimedTest(RUNFILE, None, GzipRun1()) ReportSpeed(L,trg,'gzip') #end#enddef SetFileSize(F,L): fd = os.open(F, os.O_CREAT | os.O_WRONLY) os.ftruncate(fd,L) assert os.fstat(fd).st_size == L os.close(fd)#enddef ReportSpeed(L,tr,desc): print '%s run length %u: size %u: time %.3f ms: decode %.3f ms' % \ (desc, L, tr.encode_size, tr.encode_time.mean * 1000.0, tr.decode_time.mean * 1000.0)#endclass Xdelta3RunClass: def __init__(self, extra): self.extra = extra #end def __str__(self): return ' '.join(self.extra) #end def New(self): return Xdelta3Runner(self.extra) #end#endclass Xdelta3Runner: def __init__(self, extra): self.extra = extra #end def Encode(self, target, source, output): args = (ALL_ARGS + self.extra + ['-e']) if source: args.append('-s') args.append(source) #end args = args + [target, output] self.Main(args) #end def Decode(self, input, source, output): args = (ALL_ARGS + ['-d']) if source: args.append('-s') args.append(source) #end args = args + [input, output] self.Main(args) #end def Verify(self, target, recon): RunCommand(('cmp', target, recon)) #end def EncodeSize(self, output): return os.stat(output).st_size #end def Main(self, args): try: #print 'Run %s' % (' '.join(args)) xdelta3.xd3_main_cmdline(args) except Exception, e: raise CommandError(args, "xdelta3.main exception: %s" % e) #end #end#end
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -