📄 mpiexec.py
字号:
usage() nProcs = argset[argidx+1] if not nProcs.isdigit(): print '** non-numeric arg to -n: %s' % nProcs usage() nProcs = int(nProcs) argidx += 2 elif argset[argidx] == '-host': if len(argset) < (argidx+2): print '** missing arg to -host' usage() try: host = socket.gethostbyname_ex(argset[argidx+1])[2][0] except: print 'unable to do find info for host %s' % (argset[argidx+1]) sys.exit(-1) argidx += 2 elif argset[argidx] == '-path': if len(argset) < (argidx+2): print '** missing arg to -path' usage() wpath = argset[argidx+1] argidx += 2 elif argset[argidx] == '-wdir': if len(argset) < (argidx+2): print '** missing arg to -wdir' usage() wdir = argset[argidx+1] argidx += 2 elif argset[argidx] == '-umask': if len(argset) < (argidx+2): print '** missing arg to -umask' usage() wumask = argset[argidx+1] argidx += 2 elif argset[argidx] == '-soft': if len(argset) < (argidx+2): print '** missing arg to -soft' usage() softness = argset[argidx+1] argidx += 2 elif argset[argidx] == '-arch': if len(argset) < (argidx+2): print '** missing arg to -arch' usage() print '** -arch is accepted but not used' argidx += 2 elif argset[argidx] == '-envall': envall = 1 argidx += 1 elif argset[argidx] == '-envnone': envall = 0 argidx += 1 elif argset[argidx] == '-envlist': localEnvlist = argset[argidx+1].split(',') argidx += 2 elif argset[argidx] == '-env': if len(argset) < (argidx+3): print '** missing arg to -env' usage() var = argset[argidx+1] val = argset[argidx+2] localEnv[var] = val argidx += 3 else: print 'unknown "local" option: %s' % argset[argidx] usage() if softness: nProcs = adjust_nprocs(nProcs,softness) cmdAndArgs = [] if argidx < len(argset): while argidx < len(argset): cmdAndArgs.append(argset[argidx]) argidx += 1 else: if gexec: cmdAndArgs = [gexec] if not cmdAndArgs: print 'no cmd specified' usage() argsetLoRange = nextRange argsetHiRange = nextRange + nProcs - 1 loRange = argsetLoRange hiRange = argsetHiRange defaultHostForArgset = host while loRange <= argsetHiRange: host = defaultHostForArgset if machineFileInfo: if len(machineFileInfo) <= hiRange: print 'too few entries in machinefile' sys.exit(-1) host = machineFileInfo[loRange]['host'] ifhn = machineFileInfo[loRange]['ifhn'] if ifhn: msgToMPD['ifhns'][loRange] = ifhn for i in range(loRange+1,hiRange+1): if machineFileInfo[i]['host'] != host or machineFileInfo[i]['ifhn'] != ifhn: hiRange = i - 1 break asRange = (loRange,hiRange) # this argset range as a tuple msgToMPD['users'][asRange] = mpd_get_my_username() msgToMPD['execs'][asRange] = cmdAndArgs[0] msgToMPD['paths'][asRange] = wpath msgToMPD['cwds'][asRange] = wdir msgToMPD['umasks'][asRange] = wumask msgToMPD['args'][asRange] = cmdAndArgs[1:] if host.startswith('_any_'): msgToMPD['hosts'][(loRange,hiRange)] = host else: try: msgToMPD['hosts'][asRange] = socket.gethostbyname_ex(host)[2][0] except: print 'unable to do find info for host %s' % (host) sys.exit(-1) envToSend = {} if envall: for envvar in os.environ.keys(): envToSend[envvar] = os.environ[envvar] for envvar in parmdb['-genvlist']: if not os.environ.has_key(envvar): print '%s in envlist does not exist in your env' % (envvar) sys.exit(-1) envToSend[envvar] = os.environ[envvar] for envvar in localEnvlist: if not os.environ.has_key(envvar): print '%s in envlist does not exist in your env' % (envvar) sys.exit(-1) envToSend[envvar] = os.environ[envvar] for envvar in parmdb['-genv'].keys(): envToSend[envvar] = parmdb['-genv'][envvar] for envvar in localEnv.keys(): envToSend[envvar] = localEnv[envvar] if usize: envToSend['MPI_UNIVERSE_SIZE'] = str(usize) envToSend['MPI_APPNUM'] = str(appnum) msgToMPD['envvars'][(loRange,hiRange)] = envToSend loRange = hiRange + 1 hiRange = argsetHiRange # again appnum += 1 nextRange += nProcs parmdb[('cmdline','nprocs')] = parmdb['nprocs'] + nProcs # Adjust nProcs (called maxprocs in the Standard) according to soft:# Our interpretation is that we need the largest number <= nProcs that is# consistent with the list of possible values described by soft. I.e.# if the user says## mpiexec -n 10 -soft 5 a.out## we adjust the 10 down to 5. This may not be what was intended in the Standard,# but it seems to be what it says.def adjust_nprocs(nProcs,softness): biglist = [] list1 = softness.split(',') for triple in list1: # triple is a or a:b or a:b:c thingy = triple.split(':') if len(thingy) == 1: a = int(thingy[0]) if a <= nProcs and a >= 0: biglist.append(a) elif len(thingy) == 2: a = int(thingy[0]) b = int(thingy[1]) for i in range(a,b+1): if i <= nProcs and i >= 0: biglist.append(i) elif len(thingy) == 3: a = int(thingy[0]) b = int(thingy[1]) c = int(thingy[2]) for i in range(a,b+1,c): if i <= nProcs and i >= 0: biglist.append(i) else: print 'invalid subargument to -soft: %s' % (softness) print 'should be a or a:b or a:b:c' usage() if len(biglist) == 0: print '-soft argument %s allows no valid number of processes' % (softness) usage() else: return max(biglist)def read_machinefile(machineFilename): if not machineFilename: return None try: machineFile = open(machineFilename,'r') except: print '** unable to open machinefile' sys.exit(-1) procID = 0 machineFileInfo = {} for line in machineFile: line = line.strip() if not line or line[0] == '#': continue splitLine = re.split(r'\s+',line) host = splitLine[0] if ':' in host: (host,nprocs) = host.split(':',1) nprocs = int(nprocs) else: nprocs = 1 kvps = {'ifhn' : ''} for kv in splitLine[1:]: (k,v) = kv.split('=',1) if k == 'ifhn': # interface hostname kvps[k] = v else: # may be other kv pairs later print 'unrecognized key in machinefile:', k sys.exit(-1) for i in range(procID,procID+nprocs): machineFileInfo[i] = { 'host' : host, 'nprocs' : nprocs } machineFileInfo[i].update(kvps) procID += nprocs return machineFileInfodef handle_man_input(sock,streamHandler): global numDoneWithIO, myExitStatus global outXmlDoc, outECs msg = sock.recv_dict_msg() if not msg: streamHandler.del_handler(sock) numDoneWithIO += 1 elif not msg.has_key('cmd'): mpd_print(1,'mpiexec: from man, invalid msg=:%s:' % (msg) ) sys.exit(-1) elif msg['cmd'] == 'startup_status': if msg['rc'] != 0: # print 'rank %d (%s) in job %s failed to find executable %s' % \ # ( msg['rank'], msg['src'], msg['jobid'], msg['exec'] ) host = msg['src'].split('_')[0] reason = unquote(msg['reason']) print 'problem with execution of %s on %s: %s ' % \ (msg['exec'],host,reason) # don't stop ; keep going until all top-level mans finish elif msg['cmd'] == 'job_aborted_early': print 'rank %d in job %s caused collective abort of all ranks' % \ ( msg['rank'], msg['jobid'] ) status = msg['exit_status'] if hasattr(os,'WIFSIGNALED') and os.WIFSIGNALED(status): if status > myExitStatus: myExitStatus = status killed_status = status & 0x007f # AND off core flag print ' exit status of rank %d: killed by signal %d ' % \ (msg['rank'],killed_status) elif hasattr(os,'WEXITSTATUS'): exit_status = os.WEXITSTATUS(status) if exit_status > myExitStatus: myExitStatus = exit_status print ' exit status of rank %d: return code %d ' % \ (msg['rank'],exit_status) else: myExitStatus = 0 elif msg['cmd'] == 'job_aborted': print 'job aborted; reason = %s' % (msg['reason']) elif msg['cmd'] == 'client_exit_status': if outECs: if parmdb['ecfn_format'] == 'xml': outXmlProc = outXmlDoc.createElement('exit-code') outECs.appendChild(outXmlProc) outXmlProc.setAttribute('rank',str(msg['cli_rank'])) outXmlProc.setAttribute('status',str(msg['cli_status'])) outXmlProc.setAttribute('pid',str(msg['cli_pid'])) outXmlProc.setAttribute('host',msg['cli_host']) # cli_ifhn is also avail else: outECs += 'rank=%d status=%d pid=%d host=%s\n' % \ (msg['cli_rank'],msg['cli_status'],msg['cli_pid'],msg['cli_host']) # print "exit info: rank=%d host=%s pid=%d status=%d" % \ # (msg['cli_rank'],msg['cli_host'], # msg['cli_pid'],msg['cli_status']) status = msg['cli_status'] if hasattr(os,'WIFSIGNALED') and os.WIFSIGNALED(status): if status > myExitStatus: myExitStatus = status killed_status = status & 0x007f # AND off core flag # print 'exit status of rank %d: killed by signal %d ' % \ # (msg['cli_rank'],killed_status) elif hasattr(os,'WEXITSTATUS'): exit_status = os.WEXITSTATUS(status) if exit_status > myExitStatus: myExitStatus = exit_status # print 'exit status of rank %d: return code %d ' % \ # (msg['cli_rank'],exit_status) else: myExitStatus = 0 else: print 'unrecognized msg from manager :%s:' % msgdef handle_cli_stdout_input(sock,parmdb,streamHandler,linesPerRank): global numDoneWithIO if parmdb['MPIEXEC_MERGE_OUTPUT']: line = sock.recv_one_line() if not line: streamHandler.del_handler(sock) numDoneWithIO += 1 else: if parmdb['MPIEXEC_GDB']: line = line.replace('(gdb)\n','(gdb) ') try: (rank,rest) = line.split(':',1) rank = int(rank) linesPerRank[rank].append(rest) except: print line print_ready_merged_lines(parmdb['nprocs'],parmdb,linesPerRank) else: msg = sock.recv(1024) if not msg: streamHandler.del_handler(sock) numDoneWithIO += 1 else: sys.stdout.write(msg) sys.stdout.flush()def handle_cli_stderr_input(sock,streamHandler): global numDoneWithIO msg = sock.recv(1024) if not msg: streamHandler.del_handler(sock) numDoneWithIO += 1 else: sys.stderr.write(msg) sys.stderr.flush()# NOTE: stdin is supposed to be slow, low-volume. We read it all here (as it# appears on the fd) and send it immediately to the receivers. If the user # redirects a "large" file (perhaps as small as 5k) into us, we will send it# all out right away. This can cause things to hang on the remote (recvr) side.# We do not wait to read here until the recvrs read because there may be several# recvrs and they may read at different speeds/times.def handle_stdin_input(stdin_stream,parmdb,streamHandler,manSock): line = '' try: line = stdin_stream.readline() except IOError, errinfo: sys.stdin.flush() # probably does nothing # print "I/O err on stdin:", errinfo mpd_print(1,'stdin problem; if pgm is run in background, redirect from /dev/null') mpd_print(1,' e.g.: mpiexec -n 4 a.out < /dev/null &') else: gdbFlag = parmdb['MPIEXEC_GDB'] if line: # not EOF
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -