📄 mpd.py
字号:
if lorank < hirank: msg['hosts'][(lorank+1,hirank)] = '_any_from_pool_' break elif '_any_' in hosts.values(): done = 0 while not done: hostsKeys = hosts.keys() hostsKeys.sort() for ranks in hostsKeys: if hosts[ranks] == '_any_': (lorank,hirank) = ranks self.run_one_cli(lorank,msg) msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 del msg['hosts'][ranks] if lorank < hirank: msg['hosts'][(lorank+1,hirank)] = '_any_' procsHereForJob = len(self.activeJobs[jobid].keys()) if procsHereForJob >= self.parmdb['MPD_NCPUS']: break # out of for loop # if no more to start via any or enough started here if '_any_' not in hosts.values() \ or procsHereForJob >= self.parmdb['MPD_NCPUS']: done = 1 if msg['first_loop']: msg['ringsize'] += 1 msg['ring_ncpus'] += self.parmdb['MPD_NCPUS'] self.ring.rhsSock.send_dict_msg(msg) # forward it on around def run_one_cli(self,currRank,msg): users = msg['users'] for ranks in users.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: username = users[ranks] break execs = msg['execs'] for ranks in execs.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgm = execs[ranks] break paths = msg['paths'] for ranks in paths.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pathForExec = paths[ranks] break args = msg['args'] for ranks in args.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmArgs = dumps(args[ranks]) break envvars = msg['envvars'] for ranks in envvars.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: if msg.has_key('MPICH_ifhn'): envvars[ranks]['MPICH_INTERFACE_HOSTNAME'] = msg['MPICH_ifhn'] else: envvars[ranks]['MPICH_INTERFACE_HOSTNAME'] = self.myIfhn pgmEnvVars = dumps(envvars[ranks]) break limits = msg['limits'] for ranks in limits.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmLimits = dumps(limits[ranks]) break cwds = msg['cwds'] for ranks in cwds.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: cwd = cwds[ranks] break umasks = msg['umasks'] for ranks in umasks.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmUmask = umasks[ranks] break man_env = {} man_env.update(os.environ) # may only want to mov non-MPD_ stuff man_env['MPDMAN_MYHOST'] = self.myHost man_env['MPDMAN_MYIFHN'] = self.myIfhn man_env['MPDMAN_JOBID'] = msg['jobid'] man_env['MPDMAN_CLI_PGM'] = pgm man_env['MPDMAN_CLI_PATH'] = pathForExec man_env['MPDMAN_PGM_ARGS'] = pgmArgs man_env['MPDMAN_PGM_ENVVARS'] = pgmEnvVars man_env['MPDMAN_PGM_LIMITS'] = pgmLimits man_env['MPDMAN_CWD'] = cwd man_env['MPDMAN_UMASK'] = pgmUmask man_env['MPDMAN_SPAWNED'] = str(msg['spawned']) man_env['MPDMAN_NPROCS'] = str(msg['nprocs']) man_env['MPDMAN_MPD_LISTEN_PORT'] = str(self.parmdb['MPD_LISTEN_PORT']) man_env['MPDMAN_MPD_CONF_SECRETWORD'] = self.parmdb['MPD_SECRETWORD'] man_env['MPDMAN_CONHOST'] = msg['conhost'] man_env['MPDMAN_CONIFHN'] = msg['conifhn'] man_env['MPDMAN_CONPORT'] = str(msg['conport']) man_env['MPDMAN_RANK'] = str(currRank) man_env['MPDMAN_POS_IN_RING'] = str(msg['nstarted']) man_env['MPDMAN_STDIN_DEST'] = msg['stdin_dest'] man_env['MPDMAN_TOTALVIEW'] = str(msg['totalview']) man_env['MPDMAN_GDB'] = str(msg['gdb']) fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0]) # normalize man_env['MPDMAN_FULLPATHDIR'] = fullDirName # used to find gdbdrv man_env['MPDMAN_SINGINIT_PID'] = str(msg['singinitpid']) man_env['MPDMAN_SINGINIT_PORT'] = str(msg['singinitport']) man_env['MPDMAN_LINE_LABELS_FMT'] = msg['line_labels'] if msg.has_key('rship'): man_env['MPDMAN_RSHIP'] = msg['rship'] man_env['MPDMAN_MSHIP_HOST'] = msg['mship_host'] man_env['MPDMAN_MSHIP_PORT'] = str(msg['mship_port']) if msg.has_key('doing_bnr'): man_env['MPDMAN_DOING_BNR'] = '1' else: man_env['MPDMAN_DOING_BNR'] = '0' if msg['nstarted'] == 0: manKVSTemplate = '%s_%s_%d' % \ (self.myHost,self.parmdb['MPD_LISTEN_PORT'],self.kvs_cntr) manKVSTemplate = sub('\.','_',manKVSTemplate) # chg magpie.cs to magpie_cs manKVSTemplate = sub('\-','_',manKVSTemplate) # chg node-0 to node_0 self.kvs_cntr += 1 msg['kvs_template'] = manKVSTemplate man_env['MPDMAN_KVS_TEMPLATE'] = msg['kvs_template'] msg['username'] = username if hasattr(os,'fork'): (manPid,toManSock) = self.launch_mpdman_via_fork(msg,man_env) elif subprocess_module_available: (manPid,toManSock) = self.launch_mpdman_via_subprocess(msg,man_env) else: mpd_print(1,'neither fork nor subprocess is available') sys.exit(-1) jobid = msg['jobid'] if not self.activeJobs.has_key(jobid): self.activeJobs[jobid] = {} self.activeJobs[jobid][manPid] = { 'pgm' : pgm, 'rank' : currRank, 'username' : username, 'clipid' : -1, # until report by man 'socktoman' : toManSock } def launch_mpdman_via_fork(self,msg,man_env): man_env['MPDMAN_HOW_LAUNCHED'] = 'FORK' currRank = int(man_env['MPDMAN_RANK']) manListenSock = MPDListenSock('',0,name='tempsock') manListenPort = manListenSock.getsockname()[1] if msg['nstarted'] == 0: manEntryIfhn = '' manEntryPort = 0 msg['pos0_host'] = self.myHost msg['pos0_ifhn'] = self.myIfhn msg['pos0_port'] = str(manListenPort) man_env['MPDMAN_POS0_IFHN'] = self.myIfhn man_env['MPDMAN_POS0_PORT'] = str(manListenPort) else: manEntryIfhn = msg['entry_ifhn'] manEntryPort = msg['entry_port'] man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn'] man_env['MPDMAN_POS0_PORT'] = msg['pos0_port'] man_env['MPDMAN_LHS_IFHN'] = manEntryIfhn man_env['MPDMAN_LHS_PORT'] = str(manEntryPort) man_env['MPDMAN_MY_LISTEN_FD'] = str(manListenSock.fileno()) man_env['MPDMAN_MY_LISTEN_PORT'] = str(manListenPort) (toManSock,toMpdSock) = mpd_sockpair() toManSock.name = 'to_man' toMpdSock.name = 'to_mpd' ## to be used by mpdman below man_env['MPDMAN_TO_MPD_FD'] = str(toMpdSock.fileno()) self.streamHandler.set_handler(toManSock,self.handle_man_input) msg['entry_host'] = self.myHost msg['entry_ifhn'] = self.myIfhn msg['entry_port'] = manListenPort manPid = os.fork() if manPid == 0: self.conListenSock = 0 # don't want to clean up console if I am manager self.myId = '%s_man_%d' % (self.myHost,self.myPid) mpd_set_my_id(self.myId) self.streamHandler.close_all_active_streams() os.setpgrp() os.environ = man_env if hasattr(os,'getuid') and os.getuid() == 0 and pwd_module_available: username = msg['username'] try: pwent = pwd.getpwnam(username) except: mpd_print(1,'invalid username :%s: on %s' % (username,self.myHost)) msgToSend = {'cmd' : 'job_failed', 'reason' : 'invalid_username', 'username' : username, 'host' : self.myHost } self.conSock.send_dict_msg(msgToSend) return uid = pwent[2] gid = pwent[3] os.setgroups(mpd_get_groups_for_username(username)) os.setregid(gid,gid) try: os.setreuid(uid,uid) except OSError, errmsg1: try: os.setuid(uid) except OSError, errmsg2: mpd_print(1,"unable to perform setreuid or setuid") sys.exit(-1) import atexit # need to use full name of _exithandlers atexit._exithandlers = [] # un-register handlers in atexit module # import profile # print 'profiling the manager' # profile.run('mpdman()') mpdman = MPDMan() mpdman.run() sys.exit(0) # do NOT do cleanup (eliminated atexit handlers above) manListenSock.close() toMpdSock.close() return (manPid,toManSock) def launch_mpdman_via_subprocess(self,msg,man_env): man_env['MPDMAN_HOW_LAUNCHED'] = 'SUBPROCESS' currRank = int(man_env['MPDMAN_RANK']) if msg['nstarted'] == 0: manEntryIfhn = '' manEntryPort = 0 else: manEntryIfhn = msg['entry_ifhn'] manEntryPort = msg['entry_port'] man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn'] man_env['MPDMAN_POS0_PORT'] = msg['pos0_port'] man_env['MPDMAN_LHS_IFHN'] = manEntryIfhn man_env['MPDMAN_LHS_PORT'] = str(manEntryPort) tempListenSock = MPDListenSock() man_env['MPDMAN_MPD_PORT'] = str(tempListenSock.getsockname()[1]) # python_executable = '\Python24\python.exe' python_executable = 'python2.4' fullDirName = man_env['MPDMAN_FULLPATHDIR'] manCmd = os.path.join(fullDirName,'mpdman.py') runner = subprocess.Popen([python_executable,'-u',manCmd], # only one 'python' arg bufsize=0, env=man_env, close_fds=False) ### stdin=subprocess.PIPE,stdout=subprocess.PIPE, ### stderr=subprocess.PIPE) manPid = runner.pid oldTimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(8) try: (toManSock,toManAddr) = tempListenSock.accept() except Exception, errmsg: toManSock = 0 socket.setdefaulttimeout(oldTimeout) tempListenSock.close() if not toManSock: mpd_print(1,'failed to recv msg from launched man') return (0,0) msgFromMan = toManSock.recv_dict_msg() if not msgFromMan or not msgFromMan.has_key('man_listen_port'): toManSock.close() mpd_print(1,'invalid msg from launched man') return (0,0) manListenPort = msgFromMan['man_listen_port'] if currRank == 0: msg['pos0_host'] = self.myHost msg['pos0_ifhn'] = self.myIfhn msg['pos0_port'] = str(manListenPort) msg['entry_host'] = self.myHost msg['entry_ifhn'] = self.myIfhn msg['entry_port'] = manListenPort return (manPid,toManSock)# code for testingif __name__ == '__main__': mpd = MPD() mpd.run()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -