📄 chaingang.py
字号:
import stringimport osclass ChainGangJob: def __init__(self, name, fun, args): self.fun=fun self.args=args self.refs=-1 self.subjobs=[] def add_sub_job(self, job): job.refs = job.refs + 1 self.subjobs.append(job) def run(self, chaingang): apply(self.fun, self.args) for job in self.subjobs: if job.refs: job.refs = job.refs -1 else: chaingang.add_job(job) def count_subjobs(self): ret=len(self.subjobs) for j in self.subjobs: ret = ret + j.count_subjobs() return retclass ChainGang_nothreads: def __init__(self, todo=[], num=1): self.error=0 self.todo=todo self.output_hash={} def run(self): self.worker() def add_job(self, job): self.todo.append(job) def add_jobs(self, jobs): self.todo.extend( jobs ) def pop(self): if len(self.todo): return self.todo.pop() else: return None def doit(self, todo): todo.run(self) def worker(self): while not self.error: todo=self.pop() if not todo: break self.doit(todo)## This implements a way of parallelizing function callsdefault_concurrancy = int(os.environ.get("RIBOSOME_THREADS","1"))if default_concurrancy > 1: import thread import output import sys import err import shell if not shell.isthreadsafe(): import sysinfo print "Build was unable to find a thread-safe method for starting" print "programs. Please set RIBOSOME_THREADS to 1." if 'win32' in sysinfo.family_list: print " ... or install the win32api Python extention" sys.exit(1) class ChainGang_threaded(ChainGang_nothreads): def __init__(self, todo=[], num=default_concurrancy ): ChainGang_nothreads.__init__(self, todo) self.queue_lock=thread.allocate_lock() self.write_lock=thread.allocate_lock() self.worker_locks=[] self.num_threads=num self.output_hash={} print "Threads enabled, workers = %d" % num def run(self): self.old_stdout = sys.stdout try: sys.stdout = output.OutReplacement(sys.stdout) sys.stdout.Block() sys.stdout.fil_list.append(self) for x in range(0,self.num_threads): l=thread.allocate_lock() l.acquire() self.worker_locks.append(l) thread.start_new_thread(self.worker, (l,)) for l in self.worker_locks: l.acquire() l.release() finally: sys.stdout = self.old_stdout if self.error: raise err.error, self.error def write(self, text): if 0: self.write_lock.acquire() self.old_stdout.write( "== THREAD %d ==\n%s\n====================\n" % ( thread.get_ident(), text )); self.write_lock.release() return self.output_hash[thread.get_ident()].append(text) if len(self.output_hash) == 1: self.flush_buffer() def add_job(self, job): self.queue_lock.acquire() ChainGang_nothreads.add_job(self, job) self.queue_lock.release() def add_jobs(self, jobs): self.queue_lock.acquire() ChainGang_nothreads.add_jobs(self, jobs) self.queue_lock.release() def pop(self): self.queue_lock.acquire() ret=ChainGang_nothreads.pop(self) self.queue_lock.release() return ret def flush_buffer(self): ## Write the output of this invocation to stdout self.write_lock.acquire() try: id=thread.get_ident() output = self.output_hash[id] self.output_hash[id]=[] self.old_stdout.write(string.join(output,"")) finally: self.write_lock.release() def doit(self, todo): #self.old_stdout.write("Processing %s\n" % repr(todo)) ChainGang_nothreads.doit(self, todo) self.flush_buffer() def worker(self, lock): self.output_hash[thread.get_ident()]=[] try: # release lock try: # error handler ChainGang_nothreads.worker(self) except err.error, e: print "***Thread error trapped!" e.SetTraceback(sys.exc_info()) self.error=e except: print "***Thread error trapped!" e = err.Error() e.Set("Error in threaded call") e.SetTraceback(sys.exc_info()) self.error=e finally: self.flush_buffer() del self.output_hash[thread.get_ident()] lock.release() ChainGang = ChainGang_threadedelse: ChainGang = ChainGang_nothreadsdef ProcessModules_anyorder(modules, func, num=default_concurrancy): jobs=[] for m in modules: jobs.append(ChainGangJob(m.name, func, (m,))) ChainGang( jobs, num ).run()def nulljob(): #print "NULLJOB" passclass JobTmp: def __init__(self, first, last = None): self.first=first self.last=last or firstdef compute_module_dependencies(modules, func): dirhash={} jobtmp={} jobtmp[""]=JobTmp(ChainGangJob("NULL",nulljob, ())) for m in modules: job=ChainGangJob(m.name,func, (m,)) if jobtmp.has_key(m.name): print "Warning, two modules with name=%s" % m.name jobtmp[m.name].last.add_sub_job(job) jobtmp[m.name].last=job else: jobtmp[m.name]=JobTmp(job) for module_name in jobtmp.keys(): name=module_name if not name: continue #print "MODULE: %s" % repr(name) while 1: pos = string.rfind(name,"/") if pos == -1: #print " >>> standalone" name="" else: name = name[:pos] #print "?:: %s" % repr(name) if jobtmp.has_key(name): #print " >>> submodule" jobtmp[name].last.add_sub_job(jobtmp[module_name].first) break tmp=ChainGang_nothreads([], 1) #print jobtmp[""].first.subjobs jobtmp[""].first.run(tmp) return tmp.todo def sort_jobs(job1, job2): return job2.count_subjobs() - job1.count_subjobs()######def ProcessModules(modules, func, num=default_concurrancy): jobs=compute_module_dependencies(modules, func) ## Sorting makes no differance without threads if num > 1: jobs.sort(sort_jobs) ChainGang( jobs, num ).run()def sort_jobs_grouped(job1, job2): diff = sort_jobs(job1, job2) if diff: return diff return len(job2.args[0]) - len(job1.args[0])######def ProcessModules_grouped(modules, func, num=default_concurrancy, group_size=-1): ## Smaller groups with more threads for better if group_size == -1: group_size = 50/num jobs=compute_module_dependencies(modules, func) ## Make all job use arrays tmp=jobs[:] for j in tmp: j.args = ([ j.args[0] ], ) tmp.extend(j.subjobs) ## Group top-level jobs together newjobs=[] groups={} for j in jobs: m=j.args[0][0] if m.cvs_path: newjobs.append(j) else: key=repr ( (m.cvs_root, m.cvs_tag, m.cvs_tag_type, m.cvs_date) ) if groups.has_key(key): nj=groups[key] nj.args = (nj.args[0] + j.args[0], ) nj.subjobs.extend(j.subjobs) else: groups[key]=j nj=j if len(nj.args[0]) >= group_size: newjobs.append(nj) del groups[key] newjobs.extend(groups.values()) ## Sorting makes no differance without threads if num > 1: newjobs.sort(sort_jobs_grouped) ChainGang( newjobs, num ).run()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -