📄 xdelta3-regtest.py
字号:
class Xdelta3Mod1: def __init__(self, file): self.target_data = open(file, 'r').read() #end def Encode(self, ignore1, ignore2, ignore3): r1, encoded = xdelta3.xd3_encode_memory(self.target_data, None, 1000000, 1<<10) if r1 != 0: raise CommandError('memory', 'encode failed: %s' % r1) #end self.encoded = encoded #end def Decode(self, ignore1, ignore2, ignore3): r2, data1 = xdelta3.xd3_decode_memory(self.encoded, None, len(self.target_data)) if r2 != 0: raise CommandError('memory', 'decode failed: %s' % r1) #end self.decoded = data1 #end def Verify(self, ignore1, ignore2): if self.target_data != self.decoded: raise CommandError('memory', 'bad decode') #end #end def EncodeSize(self, ignore1): return len(self.encoded) #end#endclass GzipRun1: def Encode(self, target, source, output): assert source == None RunCommandIO(['gzip', '-cf'], target, output) #end def Decode(self, input, source, output): assert source == None RunCommandIO(['gzip', '-dcf'], input, output) #end def Verify(self, target, recon): RunCommand(('cmp', target, recon)) #end def EncodeSize(self, output): return os.stat(output).st_size #end#endclass Xdelta1RunClass: def __str__(self): return 'xdelta1' #end def New(self): return Xdelta1Runner() #end#endclass Xdelta1Runner: def Encode(self, target, source, output): assert source != None args = ['xdelta1', 'delta', '-q', source, target, output] RunCommand(args, [0, 1]) #end def Decode(self, input, source, output): assert source != None args = ['xdelta1', 'patch', '-q', input, source, output] # Note: for dumb historical reasons, xdelta1 returns 1 or 0 RunCommand(args) #end def Verify(self, target, recon): RunCommand(('cmp', target, recon)) #end def EncodeSize(self, output): return os.stat(output).st_size #end#end# exceptionsclass SkipRcsException: def __init__(self,reason): self.reason = reason #end#endclass NotEnoughVersions: def __init__(self): pass #end#endclass CommandError: def __init__(self,cmd,str): if type(cmd) is types.TupleType or \ type(cmd) is types.ListType: cmd = reduce(lambda x,y: '%s %s' % (x,y),cmd) #end print 'command was: ',cmd print 'command failed: ',str print 'have fun debugging' #end#endclass RcsVersion: def __init__(self,vstr): self.vstr = vstr #end def __cmp__(self,other): return cmp(self.date, other.date) #end def __str__(self): return str(self.vstr) #end#endclass RcsFile: def __init__(self, fname): self.fname = fname self.versions = [] self.state = HEAD_STATE #end def SetTotRev(self,s): self.totrev = int(s) #end def Rev(self,s): self.rev = RcsVersion(s) if len(self.versions) >= self.totrev: raise SkipRcsException('too many versions (in log messages)') #end self.versions.append(self.rev) #end def Date(self,s): self.rev.date = s #end def Match(self, line, state, rx, gp, newstate, f): if state == self.state: m = rx.match(line) if m: if f: f(m.group(gp)) #end self.state = newstate return 1 #end #end return None #end def Sum1Rlog(self): f = os.popen('rlog '+self.fname, "r") l = f.readline() while l: if self.Match(l, HEAD_STATE, RE_TOTREV, 1, BAR_STATE, self.SetTotRev): pass elif self.Match(l, BAR_STATE, RE_BAR, 1, REV_STATE, None): pass elif self.Match(l, REV_STATE, RE_REV, 1, DATE_STATE, self.Rev): pass elif self.Match(l, DATE_STATE, RE_DATE, 1, BAR_STATE, self.Date): pass #end l = f.readline() #end c = f.close() if c != None: raise c #end #end def Sum1(self): st = os.stat(self.fname) self.rcssize = st.st_size self.Sum1Rlog() if self.totrev != len(self.versions): raise SkipRcsException('wrong version count') #end self.versions.sort() #end def Checkout(self,n): v = self.versions[n] out = open(self.Verf(n), "w") cmd = 'co -ko -p%s %s' % (v.vstr, self.fname) total = 0 (inf, stream, err) = os.popen3(cmd, "r") inf.close() buf = stream.read() while buf: total = total + len(buf) out.write(buf) buf = stream.read() #end v.vsize = total estr = '' buf = err.read() while buf: estr = estr + buf buf = err.read() #end if stream.close(): raise CommandError(cmd, 'checkout failed: %s\n%s\n%s' % (v.vstr, self.fname, estr)) #end out.close() err.close() #end def Vdate(self,n): return self.versions[n].date #end def Vstr(self,n): return self.versions[n].vstr #end def Verf(self,n): return os.path.join(TMPDIR, 'input.%d' % n) #end def FilePairsByDate(self, runclass): if self.totrev < 2: raise NotEnoughVersions() #end self.Checkout(0) ntrials = [] if self.totrev < 2: return vtrials #end for v in range(0,self.totrev-1): if v > 1: os.remove(self.Verf(v-1)) #end self.Checkout(v+1) if os.stat(self.Verf(v)).st_size < MIN_SIZE or \ os.stat(self.Verf(v+1)).st_size < MIN_SIZE: continue #end result = TimedTest(self.Verf(v+1), self.Verf(v), runclass.New()) target_size = os.stat(self.Verf(v+1)).st_size ntrials.append(result) #end os.remove(self.Verf(self.totrev-1)) os.remove(self.Verf(self.totrev-2)) return ntrials #end def AppendVersion(self, f, n): self.Checkout(n) rf = open(self.Verf(n), "r") data = rf.read() f.write(data) rf.close() return len(data) #endclass RcsFinder: def __init__(self): self.subdirs = [] self.rcsfiles = [] self.others = [] self.skipped = [] self.biground = 0 #end def Scan1(self,dir): dents = os.listdir(dir) subdirs = [] rcsfiles = [] others = [] for dent in dents: full = os.path.join(dir, dent) if os.path.isdir(full): subdirs.append(full) elif dent[len(dent)-2:] == ",v": rcsfiles.append(RcsFile(full)) else: others.append(full) #end #end self.subdirs = self.subdirs + subdirs self.rcsfiles = self.rcsfiles + rcsfiles self.others = self.others + others return subdirs #end def Crawl(self, dir): subdirs = [dir] while subdirs: s1 = self.Scan1(subdirs[0]) subdirs = subdirs[1:] + s1 #end #end def Summarize(self): good = [] for rf in self.rcsfiles: try: rf.Sum1() if rf.totrev < 2: raise SkipRcsException('too few versions (< 2)') #end except SkipRcsException, e: #print 'skipping file %s: %s' % (rf.fname, e.reason) self.skipped.append(rf) else: good.append(rf) #end self.rcsfiles = good #end def AllPairsByDate(self, runclass): results = [] good = [] for rf in self.rcsfiles: try: results = results + rf.FilePairsByDate(runclass) except SkipRcsException: print 'file %s has compressed versions: skipping' % (rf.fname) except NotEnoughVersions: print 'testing %s on %s: not enough versions' % (runclass, rf.fname) else: good.append(rf) #end self.rcsfiles = good self.ReportPairs(runclass, results) return results #end def ReportPairs(self, name, results): encode_time = 0 decode_time = 0 encode_size = 0 for r in results: encode_time += r.encode_time.mean decode_time += r.decode_time.mean encode_size += r.encode_size #end print '%s rcs: encode %.2f s: decode %.2f s: size %d' % \ (name, encode_time, decode_time, encode_size) #end def MakeBigFiles(self, rand): f1 = open(TMPDIR + "/big.1", "w") f2 = open(TMPDIR + "/big.2", "w") population = [] for file in self.rcsfiles: if len(file.versions) < 2: continue population.append(file) #end f1sz = 0 f2sz = 0 fcount = int(len(population) * FILE_P) assert fcount > 0 for file in rand.sample(population, fcount): m = IGNORE_FILENAME.match(file.fname) if m != None: continue #end r1, r2 = rand.sample(xrange(0, len(file.versions)), 2) f1sz += file.AppendVersion(f1, r1) f2sz += file.AppendVersion(f2, r2) #m.update('%s,%s,%s ' % (file.fname[len(RCSDIR):], file.Vstr(r1), file.Vstr(r2))) #end testkey = 'rcs%d' % self.biground self.biground = self.biground + 1 print '%s; source %u bytes; target %u bytes' % (testkey, f1sz, f2sz) f1.close() f2.close() return (TMPDIR + "/big.1", TMPDIR + "/big.2", testkey) #end def Generator(self): return lambda rand: self.MakeBigFiles(rand) #end#end# find a set of RCS files for testingdef GetTestRcsFiles(): rcsf = RcsFinder() rcsf.Crawl(RCSDIR) if len(rcsf.rcsfiles) == 0: raise CommandError('', 'no RCS files') #end rcsf.Summarize()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -