⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chaingang.py

📁 linux下的一款播放器
💻 PY
字号:
import stringimport osclass ChainGangJob:    def __init__(self, name, fun, args):        self.fun=fun        self.args=args        self.refs=-1        self.subjobs=[]    def add_sub_job(self, job):        job.refs = job.refs + 1        self.subjobs.append(job)    def run(self, chaingang):        apply(self.fun, self.args)        for job in self.subjobs:            if job.refs:                job.refs = job.refs -1            else:                chaingang.add_job(job)    def count_subjobs(self):        ret=len(self.subjobs)        for j in self.subjobs:            ret = ret + j.count_subjobs()        return retclass ChainGang_nothreads:    def __init__(self, todo=[], num=1):        self.error=0        self.todo=todo        self.output_hash={}    def run(self):        self.worker()    def add_job(self, job):        self.todo.append(job)     def add_jobs(self, jobs):        self.todo.extend( jobs )    def pop(self):        if len(self.todo):            return self.todo.pop()        else:            return None    def doit(self, todo):        todo.run(self)    def worker(self):        while not self.error:            todo=self.pop()            if not todo:                break            self.doit(todo)## This implements a way of parallelizing function callsdefault_concurrancy = int(os.environ.get("RIBOSOME_THREADS","1"))if default_concurrancy > 1:    import thread    import output    import sys    import err    import shell    if not shell.isthreadsafe():        import sysinfo                print "Build was unable to find a thread-safe method for starting"        print "programs. Please set RIBOSOME_THREADS to 1."        if 'win32' in sysinfo.family_list:            print " ... or install the win32api Python extention"        sys.exit(1)    class ChainGang_threaded(ChainGang_nothreads):        def __init__(self, todo=[], num=default_concurrancy ):            ChainGang_nothreads.__init__(self, todo)            self.queue_lock=thread.allocate_lock()            self.write_lock=thread.allocate_lock()            self.worker_locks=[]            self.num_threads=num            self.output_hash={}            print "Threads enabled, workers = %d" % num        def run(self):            self.old_stdout = sys.stdout            try:                sys.stdout = output.OutReplacement(sys.stdout)                sys.stdout.Block()                sys.stdout.fil_list.append(self)                for x in range(0,self.num_threads):                    l=thread.allocate_lock()                    l.acquire()                    self.worker_locks.append(l)                    thread.start_new_thread(self.worker, (l,))                for l in self.worker_locks:                    l.acquire()                    l.release()            finally:                sys.stdout = self.old_stdout            if self.error:                raise err.error, self.error        def write(self, text):            if 0:                self.write_lock.acquire()                self.old_stdout.write(                    "== THREAD %d ==\n%s\n====================\n" %                    ( thread.get_ident(), text ));                self.write_lock.release()                return            self.output_hash[thread.get_ident()].append(text)            if len(self.output_hash) == 1:                self.flush_buffer()        def add_job(self, job):            self.queue_lock.acquire()            ChainGang_nothreads.add_job(self, job)            self.queue_lock.release()        def add_jobs(self, jobs):            self.queue_lock.acquire()            ChainGang_nothreads.add_jobs(self, jobs)            self.queue_lock.release()        def pop(self):            self.queue_lock.acquire()            ret=ChainGang_nothreads.pop(self)            self.queue_lock.release()            return ret        def flush_buffer(self):            ## Write the output of this invocation to stdout            self.write_lock.acquire()            try:                id=thread.get_ident()                output = self.output_hash[id]                self.output_hash[id]=[]                self.old_stdout.write(string.join(output,""))            finally:                self.write_lock.release()        def doit(self, todo):            #self.old_stdout.write("Processing %s\n" % repr(todo))            ChainGang_nothreads.doit(self, todo)            self.flush_buffer()        def worker(self, lock):            self.output_hash[thread.get_ident()]=[]            try: # release lock                try: # error handler                    ChainGang_nothreads.worker(self)                except err.error, e:                    print "***Thread error trapped!"                    e.SetTraceback(sys.exc_info())                    self.error=e                except:                    print "***Thread error trapped!"                    e = err.Error()                    e.Set("Error in threaded call")                    e.SetTraceback(sys.exc_info())                    self.error=e            finally:                self.flush_buffer()                del self.output_hash[thread.get_ident()]                lock.release()    ChainGang = ChainGang_threadedelse:    ChainGang = ChainGang_nothreadsdef ProcessModules_anyorder(modules, func, num=default_concurrancy):    jobs=[]    for m in modules:        jobs.append(ChainGangJob(m.name, func, (m,)))    ChainGang( jobs, num ).run()def nulljob():    #print "NULLJOB"    passclass JobTmp:    def __init__(self, first, last = None):        self.first=first        self.last=last or firstdef compute_module_dependencies(modules, func):    dirhash={}    jobtmp={}    jobtmp[""]=JobTmp(ChainGangJob("NULL",nulljob, ()))    for m in modules:        job=ChainGangJob(m.name,func, (m,))        if jobtmp.has_key(m.name):            print "Warning, two modules with name=%s" % m.name            jobtmp[m.name].last.add_sub_job(job)            jobtmp[m.name].last=job        else:            jobtmp[m.name]=JobTmp(job)    for module_name in jobtmp.keys():        name=module_name        if not name:            continue        #print "MODULE: %s" % repr(name)        while 1:            pos = string.rfind(name,"/")            if pos == -1:                #print "  >>> standalone"                name=""            else:                name = name[:pos]            #print "?:: %s" % repr(name)            if jobtmp.has_key(name):                #print "  >>> submodule"                jobtmp[name].last.add_sub_job(jobtmp[module_name].first)                break    tmp=ChainGang_nothreads([], 1)    #print jobtmp[""].first.subjobs    jobtmp[""].first.run(tmp)    return tmp.todo    def sort_jobs(job1, job2):    return job2.count_subjobs() - job1.count_subjobs()######def ProcessModules(modules, func, num=default_concurrancy):    jobs=compute_module_dependencies(modules, func)    ## Sorting makes no differance without threads    if num > 1:        jobs.sort(sort_jobs)    ChainGang( jobs, num ).run()def sort_jobs_grouped(job1, job2):    diff = sort_jobs(job1, job2)    if diff:        return diff    return len(job2.args[0]) - len(job1.args[0])######def ProcessModules_grouped(modules, func,                           num=default_concurrancy,                           group_size=-1):    ## Smaller groups with more threads for better    if group_size == -1:        group_size = 50/num            jobs=compute_module_dependencies(modules, func)    ## Make all job use arrays    tmp=jobs[:]    for j in tmp:        j.args = ([ j.args[0] ], )         tmp.extend(j.subjobs)    ## Group top-level jobs together    newjobs=[]    groups={}    for j in jobs:        m=j.args[0][0]        if m.cvs_path:            newjobs.append(j)        else:            key=repr ( (m.cvs_root,                        m.cvs_tag,                        m.cvs_tag_type,                        m.cvs_date) )                        if groups.has_key(key):                nj=groups[key]                nj.args = (nj.args[0] + j.args[0], )                nj.subjobs.extend(j.subjobs)            else:                groups[key]=j                nj=j            if len(nj.args[0]) >= group_size:                newjobs.append(nj)                del groups[key]    newjobs.extend(groups.values())    ## Sorting makes no differance without threads    if num > 1:        newjobs.sort(sort_jobs_grouped)    ChainGang( newjobs, num ).run()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -