⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xdelta3-regtest.py

📁 Linux下一个可以比较二进制文件的工具xdelta3.0u的源码。
💻 PY
📖 第 1 页 / 共 3 页
字号:
#!/usr/bin/python2.5# xdelta 3 - delta compression tools and library# Copyright (C) 2003, 2006, 2007, 2008.  Joshua P. MacDonald##  This program is free software; you can redistribute it and/or modify#  it under the terms of the GNU General Public License as published by#  the Free Software Foundation; either version 2 of the License, or#  (at your option) any later version.##  This program is distributed in the hope that it will be useful,#  but WITHOUT ANY WARRANTY; without even the implied warranty of#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the#  GNU General Public License for more details.##  You should have received a copy of the GNU General Public License#  along with this program; if not, write to the Free Software#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA# TODO: test 1.5 vs. greedyimport os, sys, math, re, time, types, array, randomimport xdelta3#RCSDIR = '/mnt/polaroid/Polaroid/orbit_linux/home/jmacd/PRCS'#RCSDIR = '/tmp/PRCS_read_copy'#SAMPLEDIR = "/tmp/WESNOTH_tmp/diff"#RCSDIR = 'G:/jmacd/PRCS_copy'#SAMPLEDIR = "C:/sample_data/Wesnoth/tar"#RCSDIR = '/Users/jmacd/src/ftp.kernel.org/pub/scm/linux/kernel/bkcvs/linux-2.4/net/x25'RCSDIR = '/Users/jmacd/src/ftp.kernel.org'#MIN_SIZE       = 0TIME_TOO_SHORT = 0.050SKIP_TRIALS    = 2MIN_TRIALS     = 3MAX_TRIALS     = 15# 10 = fast 1.5 = slowMIN_STDDEV_PCT = 1.5# How many results per roundMAX_RESULTS = 500TEST_ROUNDS = 500KEEP_P = (0.5)# For RCS testing, what percent to selectFILE_P = (0.50)# For run-speed testsMIN_RUN = 1000 * 1000 * 1MAX_RUN = 1000 * 1000 * 10# Testwide defaultsALL_ARGS = [    '-vv'    ]# The first 7 args go to -CSOFT_CONFIG_CNT = 7CONFIG_ORDER = [ 'large_look',                 'large_step',                 'small_look',                 'small_chain',                 'small_lchain',                 'max_lazy',                 'long_enough',                 # > SOFT_CONFIG_CNT                 'nocompress',                 'winsize',                 'srcwinsize',                 'sprevsz',                 'iopt',                 'djw',                 'altcode',                 ]CONFIG_ARGMAP = {    'winsize'    : '-W',    'srcwinsize' : '-B',    'sprevsz'    : '-P',    'iopt'       : '-I',    'nocompress' : '-N',    'djw'        : '-Sdjw',    'altcode'    : '-T',    }def INPUT_SPEC(rand):    return {    # Time/space costs:    # -C 1,2,3,4,5,6,7    'large_look' : lambda d: rand.choice([9, 10, 11, 12]),    'large_step' : lambda d: rand.choice([25, 26, 27, 28, 29, 30]),    'small_look'   : lambda d: rand.choice([4]),    'small_chain'  : lambda d: rand.choice([1]),    'small_lchain' : lambda d: rand.choice([1]),    'max_lazy'     : lambda d: rand.choice([4, 5, 6, 7, 8, 9, 10 ]),    # Note: long_enough only refers to small matching and has no effect if    # small_chain == 1.    'long_enough'  : lambda d: rand.choice([4]),    # -N    'nocompress'   : lambda d: rand.choice(['false']),    # -T    'altcode'      : lambda d: rand.choice(['false']),    # -S djw    'djw'          : lambda d: rand.choice(['false']),    # Memory costs:    # -W    'winsize'      : lambda d: 8 * (1<<20),    # -B    'srcwinsize'   : lambda d: 64 * (1<<20),    # -I 0 is unlimited    'iopt'         : lambda d: 0,    # -P only powers of two    'sprevsz'      : lambda d: rand.choice([x * (1<<16) for x in [4]]),  }#end#TMPDIR = '/tmp/xd3regtest.%d' % os.getpid()RUNFILE = os.path.join(TMPDIR, 'run')DFILE   = os.path.join(TMPDIR, 'output')RFILE   = os.path.join(TMPDIR, 'recon')HEAD_STATE = 0BAR_STATE  = 1REV_STATE  = 2DATE_STATE = 3#IGNORE_FILENAME  = re.compile('.*\\.(gif|jpg).*')# rcs outputRE_TOTREV  = re.compile('total revisions: (\\d+)')RE_BAR     = re.compile('----------------------------')RE_REV     = re.compile('revision (.+)')RE_DATE    = re.compile('date: ([^;]+);.*')# xdelta outputRE_HDRSZ   = re.compile('VCDIFF header size: +(\\d+)')RE_EXTCOMP = re.compile('XDELTA ext comp.*')def c2str(c):    return ' '.join(['%s' % x for x in c])#enddef SumList(l):    return reduce(lambda x,y: x+y, l)#end# returns (total, mean, stddev, q2 (median),#          (q3-q1)/2 ("semi-interquartile range"), max-min (spread))class StatList:    def __init__(self,l,desc):        cnt = len(l)        assert(cnt > 1)        l.sort()        self.cnt    = cnt        self.l      = l        self.total  = SumList(l)        self.mean   = self.total / float(self.cnt)        self.s      = math.sqrt(SumList([(x-self.mean) * (x - self.mean) for x in l]) / float(self.cnt-1))        self.q0     = l[0]        self.q1     = l[int(self.cnt/4.0+0.5)]        self.q2     = l[int(self.cnt/2.0+0.5)]        self.q3     = l[min(self.cnt-1,int((3.0*self.cnt)/4.0+0.5))]        self.q4     = l[self.cnt-1]+1        self.siqr   = (self.q3-self.q1)/2.0;        self.spread = (self.q4-self.q0)        self.str    = '%s %d; mean %d; sdev %d; q2 %d; .5(q3-q1) %.1f; spread %d' % \                      (desc, self.total, self.mean, self.s, self.q2, self.siqr, self.spread)    #end#enddef RunCommand(args, ok = [0]):    #print 'run command %s' % (' '.join(args))    p = os.spawnvp(os.P_WAIT, args[0], args)    if p not in ok:        raise CommandError(args, 'exited %d' % p)    #end#enddef RunCommandIO(args,infn,outfn):    p = os.fork()    if p == 0:        os.dup2(os.open(infn,os.O_RDONLY),0)        os.dup2(os.open(outfn,os.O_CREAT|os.O_TRUNC|os.O_WRONLY),1)        os.execvp(args[0], args)    else:        s = os.waitpid(p,0)        o = os.WEXITSTATUS(s[1])        if not os.WIFEXITED(s[1]) or o != 0:            raise CommandError(args, 'exited %d' % o)        #end    #end#endclass TimedTest:    def __init__(self, target, source, runnable,                 skip_trials = SKIP_TRIALS,                 min_trials = MIN_TRIALS,                 max_trials = MAX_TRIALS,                 min_stddev_pct = MIN_STDDEV_PCT):        self.target = target        self.source = source        self.runnable = runnable        self.skip_trials = skip_trials        self.min_trials = min(min_trials, max_trials)        self.max_trials = max_trials        self.min_stddev_pct = min_stddev_pct        self.encode_time = self.DoTest(DFILE,                                       lambda x: x.Encode(self.target, self.source, DFILE))        self.encode_size = runnable.EncodeSize(DFILE)        self.decode_time = self.DoTest(RFILE,                                       lambda x: x.Decode(DFILE, self.source, RFILE),                                       )        # verify        runnable.Verify(self.target, RFILE)    #end    def DoTest(self, fname, func):        trials   = 0        measured = []        while 1:            try:                os.remove(fname)            except OSError:                pass            start_time  = time.time()            start_clock = time.clock()            func(self.runnable)            total_clock = (time.clock() - start_clock)            total_time  = (time.time() - start_time)            elap_time  = max(total_time,  0.0000001)            elap_clock = max(total_clock, 0.0000001)            trials = trials + 1            # skip some of the first trials            if trials > self.skip_trials:                measured.append((elap_clock, elap_time))                #print 'measurement total: %.1f ms' % (total_time * 1000.0)            # at least so many            if trials < (self.skip_trials + self.min_trials):                #print 'continue: need more trials: %d' % trials                continue            # compute %variance            done = 0            if self.skip_trials + self.min_trials <= 2:                measured = measured + measured;                done = 1            #end            time_stat = StatList([x[1] for x in measured], 'elap time')            sp = float(time_stat.s) / float(time_stat.mean)            # what if MAX_TRIALS is exceeded?            too_many = (trials - self.skip_trials) >= self.max_trials            good = (100.0 * sp) < self.min_stddev_pct            if done or too_many or good:                trials = trials - self.skip_trials                if not done and not good:                    #print 'too many trials: %d' % trials                    pass                #clock = StatList([x[0] for x in measured], 'elap clock')                return time_stat            #end        #end    #end#enddef Decimals(start, end):    l = []    step = start    while 1:        r = range(step, step * 10, step)        l = l + r        if step * 10 >= end:            l.append(step * 10)            break        step = step * 10    return l#end# This tests the raw speed of 0-byte inputsdef RunSpeedTest():    for L in Decimals(MIN_RUN, MAX_RUN):        SetFileSize(RUNFILE, L)        trx = TimedTest(RUNFILE, None, Xdelta3Runner(['-W', str(1<<20)]))        ReportSpeed(L, trx, '1MB ')        trx = TimedTest(RUNFILE, None, Xdelta3Runner(['-W', str(1<<19)]))        ReportSpeed(L, trx, '512k')        trx = TimedTest(RUNFILE, None, Xdelta3Runner(['-W', str(1<<18)]))        ReportSpeed(L, trx, '256k')        trm = TimedTest(RUNFILE, None, Xdelta3Mod1(RUNFILE))        ReportSpeed(L, trm, 'swig')        trg = TimedTest(RUNFILE, None, GzipRun1())        ReportSpeed(L,trg,'gzip')    #end#enddef SetFileSize(F,L):    fd = os.open(F, os.O_CREAT | os.O_WRONLY)    os.ftruncate(fd,L)    assert os.fstat(fd).st_size == L    os.close(fd)#enddef ReportSpeed(L,tr,desc):    print '%s run length %u: size %u: time %.3f ms: decode %.3f ms' % \          (desc, L,           tr.encode_size,           tr.encode_time.mean * 1000.0,           tr.decode_time.mean * 1000.0)#endclass Xdelta3RunClass:    def __init__(self, extra):        self.extra = extra    #end    def __str__(self):        return ' '.join(self.extra)    #end    def New(self):        return Xdelta3Runner(self.extra)    #end#endclass Xdelta3Runner:    def __init__(self, extra):        self.extra = extra    #end    def Encode(self, target, source, output):        args = (ALL_ARGS +                self.extra +                ['-e'])        if source:            args.append('-s')            args.append(source)        #end        args = args + [target, output]        self.Main(args)    #end    def Decode(self, input, source, output):        args = (ALL_ARGS +                ['-d'])        if source:            args.append('-s')            args.append(source)        #end        args = args + [input, output]        self.Main(args)    #end    def Verify(self, target, recon):        RunCommand(('cmp', target, recon))    #end    def EncodeSize(self, output):        return os.stat(output).st_size    #end    def Main(self, args):        try:            #print 'Run %s' % (' '.join(args))            xdelta3.xd3_main_cmdline(args)        except Exception, e:            raise CommandError(args, "xdelta3.main exception: %s" % e)        #end    #end#end

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -