⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xdelta3-regtest.py

📁 Linux下一个可以比较二进制文件的工具xdelta3.0u的源码。
💻 PY
📖 第 1 页 / 共 3 页
字号:
class Xdelta3Mod1:    def __init__(self, file):        self.target_data = open(file, 'r').read()    #end    def Encode(self, ignore1, ignore2, ignore3):        r1, encoded = xdelta3.xd3_encode_memory(self.target_data, None, 1000000, 1<<10)        if r1 != 0:            raise CommandError('memory', 'encode failed: %s' % r1)        #end        self.encoded = encoded    #end    def Decode(self, ignore1, ignore2, ignore3):        r2, data1 = xdelta3.xd3_decode_memory(self.encoded, None, len(self.target_data))        if r2 != 0:            raise CommandError('memory', 'decode failed: %s' % r1)        #end        self.decoded = data1    #end    def Verify(self, ignore1, ignore2):        if self.target_data != self.decoded:            raise CommandError('memory', 'bad decode')        #end    #end    def EncodeSize(self, ignore1):        return len(self.encoded)    #end#endclass GzipRun1:    def Encode(self, target, source, output):        assert source == None        RunCommandIO(['gzip', '-cf'], target, output)    #end    def Decode(self, input, source, output):        assert source == None        RunCommandIO(['gzip', '-dcf'], input, output)    #end    def Verify(self, target, recon):        RunCommand(('cmp', target, recon))    #end    def EncodeSize(self, output):        return os.stat(output).st_size    #end#endclass Xdelta1RunClass:    def __str__(self):        return 'xdelta1'    #end    def New(self):        return Xdelta1Runner()    #end#endclass Xdelta1Runner:    def Encode(self, target, source, output):        assert source != None        args = ['xdelta1', 'delta', '-q', source, target, output]        RunCommand(args, [0, 1])    #end    def Decode(self, input, source, output):        assert source != None        args = ['xdelta1', 'patch', '-q', input, source, output]        # Note: for dumb historical reasons, xdelta1 returns 1 or 0        RunCommand(args)    #end    def Verify(self, target, recon):        RunCommand(('cmp', target, recon))    #end    def EncodeSize(self, output):        return os.stat(output).st_size    #end#end# exceptionsclass SkipRcsException:    def __init__(self,reason):        self.reason = reason    #end#endclass NotEnoughVersions:    def __init__(self):        pass    #end#endclass CommandError:    def __init__(self,cmd,str):        if type(cmd) is types.TupleType or \           type(cmd) is types.ListType:            cmd = reduce(lambda x,y: '%s %s' % (x,y),cmd)        #end        print 'command was: ',cmd        print 'command failed: ',str        print 'have fun debugging'    #end#endclass RcsVersion:    def __init__(self,vstr):        self.vstr = vstr    #end    def __cmp__(self,other):        return cmp(self.date, other.date)    #end    def __str__(self):        return str(self.vstr)    #end#endclass RcsFile:    def __init__(self, fname):        self.fname    = fname        self.versions = []        self.state    = HEAD_STATE    #end    def SetTotRev(self,s):        self.totrev = int(s)    #end    def Rev(self,s):        self.rev = RcsVersion(s)        if len(self.versions) >= self.totrev:            raise SkipRcsException('too many versions (in log messages)')        #end        self.versions.append(self.rev)    #end    def Date(self,s):        self.rev.date = s    #end    def Match(self, line, state, rx, gp, newstate, f):        if state == self.state:            m = rx.match(line)            if m:                if f:                    f(m.group(gp))                #end                self.state = newstate                return 1            #end        #end        return None    #end    def Sum1Rlog(self):        f = os.popen('rlog '+self.fname, "r")        l = f.readline()        while l:            if self.Match(l, HEAD_STATE, RE_TOTREV, 1, BAR_STATE, self.SetTotRev):                pass            elif self.Match(l, BAR_STATE, RE_BAR, 1, REV_STATE, None):                pass            elif self.Match(l, REV_STATE, RE_REV, 1, DATE_STATE, self.Rev):                pass            elif self.Match(l, DATE_STATE, RE_DATE, 1, BAR_STATE, self.Date):                pass            #end            l = f.readline()        #end        c = f.close()        if c != None:            raise c        #end    #end    def Sum1(self):        st = os.stat(self.fname)        self.rcssize = st.st_size        self.Sum1Rlog()        if self.totrev != len(self.versions):            raise SkipRcsException('wrong version count')        #end        self.versions.sort()    #end    def Checkout(self,n):        v      = self.versions[n]        out    = open(self.Verf(n), "w")        cmd    = 'co -ko -p%s %s' % (v.vstr, self.fname)        total  = 0        (inf,         stream,         err)  = os.popen3(cmd, "r")        inf.close()        buf    = stream.read()        while buf:            total = total + len(buf)            out.write(buf)            buf = stream.read()        #end        v.vsize = total        estr = ''        buf = err.read()        while buf:            estr = estr + buf            buf = err.read()        #end        if stream.close():            raise CommandError(cmd, 'checkout failed: %s\n%s\n%s' % (v.vstr, self.fname, estr))        #end        out.close()        err.close()    #end    def Vdate(self,n):        return self.versions[n].date    #end    def Vstr(self,n):        return self.versions[n].vstr    #end    def Verf(self,n):        return os.path.join(TMPDIR, 'input.%d' % n)    #end    def FilePairsByDate(self, runclass):        if self.totrev < 2:            raise NotEnoughVersions()        #end        self.Checkout(0)        ntrials = []        if self.totrev < 2:            return vtrials        #end        for v in range(0,self.totrev-1):            if v > 1:                os.remove(self.Verf(v-1))            #end            self.Checkout(v+1)            if os.stat(self.Verf(v)).st_size < MIN_SIZE or \               os.stat(self.Verf(v+1)).st_size < MIN_SIZE:                continue            #end            result = TimedTest(self.Verf(v+1),                               self.Verf(v),                               runclass.New())            target_size = os.stat(self.Verf(v+1)).st_size            ntrials.append(result)        #end        os.remove(self.Verf(self.totrev-1))        os.remove(self.Verf(self.totrev-2))        return ntrials    #end    def AppendVersion(self, f, n):        self.Checkout(n)        rf = open(self.Verf(n), "r")        data = rf.read()        f.write(data)        rf.close()        return len(data)    #endclass RcsFinder:    def __init__(self):        self.subdirs  = []        self.rcsfiles = []        self.others   = []        self.skipped  = []        self.biground = 0    #end    def Scan1(self,dir):        dents = os.listdir(dir)        subdirs  = []        rcsfiles = []        others   = []        for dent in dents:            full = os.path.join(dir, dent)            if os.path.isdir(full):                subdirs.append(full)            elif dent[len(dent)-2:] == ",v":                rcsfiles.append(RcsFile(full))            else:                others.append(full)            #end        #end        self.subdirs  = self.subdirs  + subdirs        self.rcsfiles = self.rcsfiles + rcsfiles        self.others   = self.others   + others        return subdirs    #end    def Crawl(self, dir):        subdirs = [dir]        while subdirs:            s1 = self.Scan1(subdirs[0])            subdirs = subdirs[1:] + s1        #end    #end    def Summarize(self):        good = []        for rf in self.rcsfiles:            try:                rf.Sum1()                if rf.totrev < 2:                    raise SkipRcsException('too few versions (< 2)')                #end            except SkipRcsException, e:                #print 'skipping file %s: %s' % (rf.fname, e.reason)                self.skipped.append(rf)            else:                good.append(rf)            #end        self.rcsfiles = good    #end    def AllPairsByDate(self, runclass):        results = []        good = []        for rf in self.rcsfiles:            try:                results = results + rf.FilePairsByDate(runclass)            except SkipRcsException:                print 'file %s has compressed versions: skipping' % (rf.fname)            except NotEnoughVersions:                print 'testing %s on %s: not enough versions' % (runclass, rf.fname)            else:                good.append(rf)            #end        self.rcsfiles = good        self.ReportPairs(runclass, results)        return results    #end    def ReportPairs(self, name, results):        encode_time = 0        decode_time = 0        encode_size = 0        for r in results:            encode_time += r.encode_time.mean            decode_time += r.decode_time.mean            encode_size += r.encode_size        #end        print '%s rcs: encode %.2f s: decode %.2f s: size %d' % \              (name, encode_time, decode_time, encode_size)    #end    def MakeBigFiles(self, rand):        f1 = open(TMPDIR + "/big.1", "w")        f2 = open(TMPDIR + "/big.2", "w")        population = []        for file in self.rcsfiles:            if len(file.versions) < 2:                continue            population.append(file)        #end        f1sz = 0        f2sz = 0        fcount = int(len(population) * FILE_P)        assert fcount > 0        for file in rand.sample(population, fcount):            m = IGNORE_FILENAME.match(file.fname)            if m != None:                continue            #end            r1, r2 = rand.sample(xrange(0, len(file.versions)), 2)            f1sz += file.AppendVersion(f1, r1)            f2sz += file.AppendVersion(f2, r2)            #m.update('%s,%s,%s ' % (file.fname[len(RCSDIR):], file.Vstr(r1), file.Vstr(r2)))        #end        testkey = 'rcs%d' % self.biground        self.biground = self.biground + 1        print '%s; source %u bytes; target %u bytes' % (testkey, f1sz, f2sz)        f1.close()        f2.close()        return (TMPDIR + "/big.1",                TMPDIR + "/big.2",                testkey)    #end    def Generator(self):        return lambda rand: self.MakeBigFiles(rand)    #end#end# find a set of RCS files for testingdef GetTestRcsFiles():    rcsf = RcsFinder()    rcsf.Crawl(RCSDIR)    if len(rcsf.rcsfiles) == 0:        raise CommandError('', 'no RCS files')    #end    rcsf.Summarize()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -