⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpd.py

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 PY
📖 第 1 页 / 共 5 页
字号:
                        if lorank < hirank:                            msg['hosts'][(lorank+1,hirank)] = '_any_from_pool_'                    break        elif '_any_' in hosts.values():            done = 0            while not done:                hostsKeys = hosts.keys()                hostsKeys.sort()                for ranks in hostsKeys:                    if hosts[ranks] == '_any_':                        (lorank,hirank) = ranks                        self.run_one_cli(lorank,msg)                        msg['nstarted'] += 1                        msg['nstarted_on_this_loop'] += 1                        del msg['hosts'][ranks]                        if lorank < hirank:                            msg['hosts'][(lorank+1,hirank)] = '_any_'                        procsHereForJob = len(self.activeJobs[jobid].keys())                        if procsHereForJob >= self.parmdb['MPD_NCPUS']:                            break  # out of for loop                # if no more to start via any or enough started here                if '_any_' not in hosts.values() \                or procsHereForJob >= self.parmdb['MPD_NCPUS']:                    done = 1        if msg['first_loop']:            msg['ringsize'] += 1            msg['ring_ncpus'] += self.parmdb['MPD_NCPUS']        self.ring.rhsSock.send_dict_msg(msg)  # forward it on around    def run_one_cli(self,currRank,msg):        users = msg['users']        for ranks in users.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                username = users[ranks]                break        execs = msg['execs']        for ranks in execs.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                pgm = execs[ranks]                break        paths = msg['paths']        for ranks in paths.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                pathForExec = paths[ranks]                break        args = msg['args']        for ranks in args.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                pgmArgs = dumps(args[ranks])                break        envvars = msg['envvars']        for ranks in envvars.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                if msg.has_key('MPICH_ifhn'):                    envvars[ranks]['MPICH_INTERFACE_HOSTNAME'] = msg['MPICH_ifhn']                else:                    envvars[ranks]['MPICH_INTERFACE_HOSTNAME'] = self.myIfhn                pgmEnvVars = dumps(envvars[ranks])                break        limits = msg['limits']        for ranks in limits.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                pgmLimits = dumps(limits[ranks])                break        cwds = msg['cwds']        for ranks in cwds.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                cwd = cwds[ranks]                break        umasks = msg['umasks']        for ranks in umasks.keys():            (lo,hi) = ranks            if currRank >= lo  and  currRank <= hi:                pgmUmask = umasks[ranks]                break        man_env = {}        man_env.update(os.environ)    # may only want to mov non-MPD_ stuff        man_env['MPDMAN_MYHOST'] = self.myHost        man_env['MPDMAN_MYIFHN'] = self.myIfhn        man_env['MPDMAN_JOBID'] = msg['jobid']        man_env['MPDMAN_CLI_PGM'] = pgm        man_env['MPDMAN_CLI_PATH'] = pathForExec        man_env['MPDMAN_PGM_ARGS'] = pgmArgs        man_env['MPDMAN_PGM_ENVVARS'] = pgmEnvVars        man_env['MPDMAN_PGM_LIMITS'] = pgmLimits        man_env['MPDMAN_CWD'] = cwd        man_env['MPDMAN_UMASK'] = pgmUmask        man_env['MPDMAN_SPAWNED'] = str(msg['spawned'])        man_env['MPDMAN_NPROCS'] = str(msg['nprocs'])        man_env['MPDMAN_MPD_LISTEN_PORT'] = str(self.parmdb['MPD_LISTEN_PORT'])        man_env['MPDMAN_MPD_CONF_SECRETWORD'] = self.parmdb['MPD_SECRETWORD']        man_env['MPDMAN_CONHOST'] = msg['conhost']        man_env['MPDMAN_CONIFHN'] = msg['conifhn']        man_env['MPDMAN_CONPORT'] = str(msg['conport'])        man_env['MPDMAN_RANK'] = str(currRank)        man_env['MPDMAN_POS_IN_RING'] = str(msg['nstarted'])        man_env['MPDMAN_STDIN_DEST'] = msg['stdin_dest']        man_env['MPDMAN_TOTALVIEW'] = str(msg['totalview'])        man_env['MPDMAN_GDB'] = str(msg['gdb'])        fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0])  # normalize        man_env['MPDMAN_FULLPATHDIR'] = fullDirName    # used to find gdbdrv        man_env['MPDMAN_SINGINIT_PID']  = str(msg['singinitpid'])        man_env['MPDMAN_SINGINIT_PORT'] = str(msg['singinitport'])        man_env['MPDMAN_LINE_LABELS_FMT'] = msg['line_labels']        if msg.has_key('rship'):            man_env['MPDMAN_RSHIP'] = msg['rship']            man_env['MPDMAN_MSHIP_HOST'] = msg['mship_host']            man_env['MPDMAN_MSHIP_PORT'] = str(msg['mship_port'])        if msg.has_key('doing_bnr'):            man_env['MPDMAN_DOING_BNR'] = '1'        else:            man_env['MPDMAN_DOING_BNR'] = '0'        if msg['nstarted'] == 0:            manKVSTemplate = '%s_%s_%d' % \                             (self.myHost,self.parmdb['MPD_LISTEN_PORT'],self.kvs_cntr)            manKVSTemplate = sub('\.','_',manKVSTemplate)  # chg magpie.cs to magpie_cs            manKVSTemplate = sub('\-','_',manKVSTemplate)  # chg node-0 to node_0            self.kvs_cntr += 1            msg['kvs_template'] = manKVSTemplate        man_env['MPDMAN_KVS_TEMPLATE'] = msg['kvs_template']	msg['username'] = username        if hasattr(os,'fork'):            (manPid,toManSock) = self.launch_mpdman_via_fork(msg,man_env)        elif subprocess_module_available:            (manPid,toManSock) = self.launch_mpdman_via_subprocess(msg,man_env)        else:            mpd_print(1,'neither fork nor subprocess is available')            sys.exit(-1)        jobid = msg['jobid']        if not self.activeJobs.has_key(jobid):            self.activeJobs[jobid] = {}        self.activeJobs[jobid][manPid] = { 'pgm' : pgm, 'rank' : currRank,                                           'username' : username,                                           'clipid' : -1,    # until report by man                                           'socktoman' : toManSock }    def launch_mpdman_via_fork(self,msg,man_env):        man_env['MPDMAN_HOW_LAUNCHED'] = 'FORK'        currRank = int(man_env['MPDMAN_RANK'])        manListenSock = MPDListenSock('',0,name='tempsock')        manListenPort = manListenSock.getsockname()[1]        if msg['nstarted'] == 0:            manEntryIfhn = ''            manEntryPort = 0            msg['pos0_host'] = self.myHost            msg['pos0_ifhn'] = self.myIfhn            msg['pos0_port'] = str(manListenPort)            man_env['MPDMAN_POS0_IFHN'] = self.myIfhn            man_env['MPDMAN_POS0_PORT'] = str(manListenPort)        else:            manEntryIfhn = msg['entry_ifhn']            manEntryPort = msg['entry_port']            man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn']            man_env['MPDMAN_POS0_PORT'] = msg['pos0_port']        man_env['MPDMAN_LHS_IFHN']  = manEntryIfhn        man_env['MPDMAN_LHS_PORT'] = str(manEntryPort)        man_env['MPDMAN_MY_LISTEN_FD'] = str(manListenSock.fileno())        man_env['MPDMAN_MY_LISTEN_PORT'] = str(manListenPort)        (toManSock,toMpdSock) = mpd_sockpair()        toManSock.name = 'to_man'        toMpdSock.name = 'to_mpd'  ## to be used by mpdman below        man_env['MPDMAN_TO_MPD_FD'] = str(toMpdSock.fileno())        self.streamHandler.set_handler(toManSock,self.handle_man_input)        msg['entry_host'] = self.myHost        msg['entry_ifhn'] = self.myIfhn        msg['entry_port'] = manListenPort        manPid = os.fork()        if manPid == 0:            self.conListenSock = 0    # don't want to clean up console if I am manager            self.myId = '%s_man_%d' % (self.myHost,self.myPid)            mpd_set_my_id(self.myId)            self.streamHandler.close_all_active_streams()            os.setpgrp()            os.environ = man_env            if hasattr(os,'getuid')  and  os.getuid() == 0  and  pwd_module_available:		username = msg['username']                try:                    pwent = pwd.getpwnam(username)                except:                    mpd_print(1,'invalid username :%s: on %s' % (username,self.myHost))                    msgToSend = {'cmd' : 'job_failed', 'reason' : 'invalid_username',                                 'username' : username, 'host' : self.myHost }                    self.conSock.send_dict_msg(msgToSend)                    return                uid = pwent[2]                gid = pwent[3]                os.setgroups(mpd_get_groups_for_username(username))                os.setregid(gid,gid)                try:                    os.setreuid(uid,uid)                except OSError, errmsg1:                    try:                        os.setuid(uid)                    except OSError, errmsg2:                        mpd_print(1,"unable to perform setreuid or setuid")                        sys.exit(-1)            import atexit    # need to use full name of _exithandlers            atexit._exithandlers = []    # un-register handlers in atexit module            # import profile            # print 'profiling the manager'            # profile.run('mpdman()')            mpdman = MPDMan()            mpdman.run()            sys.exit(0)  # do NOT do cleanup (eliminated atexit handlers above)        manListenSock.close()        toMpdSock.close()        return (manPid,toManSock)    def launch_mpdman_via_subprocess(self,msg,man_env):        man_env['MPDMAN_HOW_LAUNCHED'] = 'SUBPROCESS'        currRank = int(man_env['MPDMAN_RANK'])        if msg['nstarted'] == 0:            manEntryIfhn = ''            manEntryPort = 0        else:            manEntryIfhn = msg['entry_ifhn']            manEntryPort = msg['entry_port']            man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn']            man_env['MPDMAN_POS0_PORT'] = msg['pos0_port']        man_env['MPDMAN_LHS_IFHN']  = manEntryIfhn        man_env['MPDMAN_LHS_PORT'] = str(manEntryPort)        tempListenSock = MPDListenSock()        man_env['MPDMAN_MPD_PORT'] = str(tempListenSock.getsockname()[1])        # python_executable = '\Python24\python.exe'        python_executable = 'python2.4'        fullDirName = man_env['MPDMAN_FULLPATHDIR']        manCmd = os.path.join(fullDirName,'mpdman.py')        runner = subprocess.Popen([python_executable,'-u',manCmd],  # only one 'python' arg                                  bufsize=0,                                  env=man_env,                                  close_fds=False)                                  ### stdin=subprocess.PIPE,stdout=subprocess.PIPE,                                  ### stderr=subprocess.PIPE)        manPid = runner.pid        oldTimeout = socket.getdefaulttimeout()        socket.setdefaulttimeout(8)        try:            (toManSock,toManAddr) = tempListenSock.accept()        except Exception, errmsg:            toManSock = 0        socket.setdefaulttimeout(oldTimeout)        tempListenSock.close()        if not toManSock:            mpd_print(1,'failed to recv msg from launched man')            return (0,0)        msgFromMan = toManSock.recv_dict_msg()        if not msgFromMan  or  not msgFromMan.has_key('man_listen_port'):            toManSock.close()            mpd_print(1,'invalid msg from launched man')            return (0,0)        manListenPort = msgFromMan['man_listen_port']        if currRank == 0:            msg['pos0_host'] = self.myHost            msg['pos0_ifhn'] = self.myIfhn            msg['pos0_port'] = str(manListenPort)        msg['entry_host'] = self.myHost        msg['entry_ifhn'] = self.myIfhn        msg['entry_port'] = manListenPort        return (manPid,toManSock)# code for testingif __name__ == '__main__':    mpd = MPD()    mpd.run()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -