📄 mpiexec.py
字号:
msgToMPD['gdba'] = parmdb['-gdba'] msgToMPD['totalview'] = parmdb['MPIEXEC_TOTALVIEW'] msgToMPD['singinitpid'] = parmdb['singinitpid'] msgToMPD['singinitport'] = parmdb['singinitport'] msgToMPD['host_spec_pool'] = parmdb['MPIEXEC_HOST_LIST'] # set sig handlers up right before we send mpdrun msg to mpd if hasattr(signal,'SIGINT'): signal.signal(signal.SIGINT, sig_handler) if hasattr(signal,'SIGTSTP'): signal.signal(signal.SIGTSTP,sig_handler) if hasattr(signal,'SIGCONT'): signal.signal(signal.SIGCONT,sig_handler) if hasattr(signal,'SIGALRM'): signal.signal(signal.SIGALRM,sig_handler) conSock.send_dict_msg(msgToMPD) msg = conSock.recv_dict_msg(timeout=recvTimeout) if not msg: mpd_print(1, 'no msg recvd from mpd when expecting ack of request') sys.exit(-1) elif msg['cmd'] == 'mpdrun_ack': currRingSize = msg['ringsize'] currRingNCPUs = msg['ring_ncpus'] else: if msg['cmd'] == 'already_have_a_console': print 'mpd already has a console (e.g. for long ringtest); try later' sys.exit(-1) elif msg['cmd'] == 'job_failed': if msg['reason'] == 'some_procs_not_started': print 'mpiexec: unable to start all procs; may have invalid machine names' print ' remaining specified hosts:' for host in msg['remaining_hosts'].values(): if host != '_any_': try: print ' %s (%s)' % (host,socket.gethostbyaddr(host)[0]) except: print ' %s' % (host) elif msg['reason'] == 'invalid_username': print 'mpiexec: invalid username %s at host %s' % \ (msg['username'],msg['host']) else: print 'mpiexec: job failed; reason=:%s:' % (msg['reason']) sys.exit(-1) else: mpd_print(1, 'unexpected message from mpd: %s' % (msg) ) sys.exit(-1) conSock.close() jobTimeout = int(parmdb['MPIEXEC_TIMEOUT']) if jobTimeout: if hasattr(signal,'alarm'): signal.alarm(jobTimeout) else: def timeout_function(): mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%d' % jobTimeout) thread.interrupt_main() try: import thread, threading timer = threading.Timer(jobTimeout,timeout_function) timer.start() except: print 'unable to establish timeout for MPIEXEC_TIMEOUT' streamHandler = MPDStreamHandler() (manSock,addr) = listenSock.accept() if not manSock: mpd_print(1, 'mpiexec: failed to obtain sock from manager') sys.exit(-1) streamHandler.set_handler(manSock,handle_man_input,args=(streamHandler,)) if hasattr(os,'fork'): streamHandler.set_handler(sys.stdin,handle_stdin_input, args=(parmdb,streamHandler,manSock)) else: # not using select on fd's when using subprocess module (probably M$) import threading def read_fd_with_func(fd,func): line = 'x' while line: line = func(fd) stdin_thread = threading.Thread(target=read_fd_with_func, args=(sys.stdin.fileno(),handle_stdin_input)) # first, do handshaking with man msg = manSock.recv_dict_msg() if (not msg or not msg.has_key('cmd') or msg['cmd'] != 'man_checking_in'): mpd_print(1, 'mpiexec: from man, invalid msg=:%s:' % (msg) ) sys.exit(-1) msgToSend = { 'cmd' : 'ringsize', 'ring_ncpus' : currRingNCPUs, 'ringsize' : currRingSize } manSock.send_dict_msg(msgToSend) msg = manSock.recv_dict_msg() if (not msg or not msg.has_key('cmd')): mpd_print(1, 'mpiexec: from man, invalid msg=:%s:' % (msg) ) sys.exit(-1) if (msg['cmd'] == 'job_started'): jobid = msg['jobid'] if outECs: if parmdb['ecfn_format'] == 'xml': outECs.setAttribute('jobid',jobid.strip()) else: outECs += 'jobid=%s\n' % (jobid.strip()) # print 'mpiexec: job %s started' % (jobid) if parmdb['MPIEXEC_TVSU']: import mtv mtv.allocate_proctable(parmdb['nprocs']) # extract procinfo (rank,hostname,exec,pid) tuples from msg for i in range(parmdb['nprocs']): tvhost = msg['procinfo'][i][0] tvpgm = msg['procinfo'][i][1] tvpid = msg['procinfo'][i][2] # print "%d %s %s %d" % (i,host,pgm,pid) mtv.append_proctable_entry(tvhost,tvpgm,tvpid) mtv.complete_spawn() msgToSend = { 'cmd' : 'tv_ready' } manSock.send_dict_msg(msgToSend) elif parmdb['MPIEXEC_TOTALVIEW']: tvname = 'totalview' if os.environ.has_key('TOTALVIEW'): tvname = os.environ['TOTALVIEW'] if not mpd_which(((tvname.strip()).split()[0])): print 'cannot find "%s" in your $PATH:' % (tvname) print ' ', os.environ['PATH'] sys.exit(-1) import mtv tv_cmd = 'dattach python ' + `os.getpid()` + '; dgo; dassign MPIR_being_debugged 1' os.system(tvname + ' -e "%s" &' % (tv_cmd) ) mtv.wait_for_debugger() mtv.allocate_proctable(parmdb['nprocs']) # extract procinfo (rank,hostname,exec,pid) tuples from msg for i in range(parmdb['nprocs']): tvhost = msg['procinfo'][i][0] tvpgm = msg['procinfo'][i][1] tvpid = msg['procinfo'][i][2] # print "%d %s %s %d" % (i,host,pgm,pid) mtv.append_proctable_entry(tvhost,tvpgm,tvpid) mtv.complete_spawn() msgToSend = { 'cmd' : 'tv_ready' } manSock.send_dict_msg(msgToSend) else: mpd_print(1, 'mpiexec: from man, unknown msg=:%s:' % (msg) ) sys.exit(-1) (manCliStdoutSock,addr) = listenSock.accept() streamHandler.set_handler(manCliStdoutSock, handle_cli_stdout_input, args=(parmdb,streamHandler,linesPerRank,)) (manCliStderrSock,addr) = listenSock.accept() streamHandler.set_handler(manCliStderrSock, handle_cli_stderr_input, args=(streamHandler,)) # Main Loop timeDelayForPrints = 2 # seconds timeForPrint = time() + timeDelayForPrints # to get started numDoneWithIO = 0 while numDoneWithIO < 3: # man, client stdout, and client stderr if sigOccurred: handle_sig_occurred(manSock) rv = streamHandler.handle_active_streams(timeout=1.0) if rv[0] < 0: # will handle some sigs at top of next loop pass # may have to handle some err conditions here if parmdb['MPIEXEC_MERGE_OUTPUT']: if timeForPrint < time(): print_ready_merged_lines(1,parmdb,linesPerRank) timeForPrint = time() + timeDelayForPrints else: print_ready_merged_lines(parmdb['nprocs'],parmdb,linesPerRank) if parmdb['MPIEXEC_MERGE_OUTPUT']: print_ready_merged_lines(1,parmdb,linesPerRank) if mshipPid: (donePid,status) = os.wait() # os.waitpid(mshipPid,0) if parmdb['MPIEXEC_EXITCODES_FILENAME']: outECFile = open(parmdb['MPIEXEC_EXITCODES_FILENAME'],'w') if parmdb['ecfn_format'] == 'xml': print >>outECFile, outXmlDoc.toprettyxml(indent=' ') else: print >>outECFile, outECs, outECFile.close() return myExitStatusdef collect_args(args,localArgSets): validGlobalArgs = { '-l' : 0, '-usize' : 1, '-gdb' : 0, '-bnr' : 0, '-tv' : 0, '-tvsu' : 0, '-ifhn' : 1, '-machinefile' : 1, '-s' : 1, '-1' : 0, '-a' : 1, '-m' : 0, '-ecfn' : 1, '-gn' : 1, '-gnp' : 1, '-ghost' : 1, '-gpath' : 1, '-gwdir' : 1, '-gsoft' : 1, '-garch' : 1, '-gexec' : 1, '-gumask' : 1, '-genvall' : 0, '-genv' : 2, '-genvnone' : 0, '-genvlist' : 1 } currumask = os.umask(0) ; os.umask(currumask) # grab it and set it back parmdb[('cmdline','-gn')] = 1 parmdb[('cmdline','-ghost')] = '_any_' if os.environ.has_key('PATH'): parmdb[('cmdline','-gpath')] = os.environ['PATH'] else: parmdb[('cmdline','-gpath')] = '' parmdb[('cmdline','-gwdir')] = os.path.abspath(os.getcwd()) parmdb[('cmdline','-gumask')] = str(currumask) parmdb[('cmdline','-gsoft')] = 0 parmdb[('cmdline','-garch')] = '' parmdb[('cmdline','-gexec')] = '' parmdb[('cmdline','-genv')] = {} parmdb[('cmdline','-genvlist')] = [] parmdb[('cmdline','-genvnone')] = 0 argidx = 1 while argidx < len(args) and args[argidx] in validGlobalArgs.keys(): garg = args[argidx] if len(args) <= (argidx+validGlobalArgs[garg]): print "missing sub-arg to %s" % (garg) usage() if garg == '-genv': parmdb['-genv'][args[argidx+1]] = args[argidx+2] argidx += 3 elif garg == '-gn' or garg == '-gnp': if args[argidx+1].isdigit(): parmdb[('cmdline','-gn')] = int(args[argidx+1]) else: print 'argument to %s must be numeric' % (garg) usage() argidx += 2 elif garg == '-ghost': try: parmdb[('cmdline',garg)] = socket.gethostbyname_ex(args[argidx+1])[2][0] except: print 'unable to do find info for host %s' % (args[argidx+1]) sys.exit(-1) argidx += 2 elif garg == '-gpath': parmdb[('cmdline','-gpath')] = args[argidx+1] argidx += 2 elif garg == '-gwdir': parmdb[('cmdline','-gwdir')] = args[argidx+1] argidx += 2 elif garg == '-gumask': parmdb[('cmdline','-gumask')] = args[argidx+1] argidx += 2 elif garg == '-gsoft': parmdb[('cmdline','-gsoft')] = args[argidx+1] argidx += 2 elif garg == '-garch': parmdb[('cmdline','-garch')] = args[argidx+1] argidx += 2 print '** -garch is accepted but not used' elif garg == '-gexec': parmdb[('cmdline','-gexec')] = args[argidx+1] argidx += 2 elif garg == '-genv': parmdb[('cmdline','-genv')] = args[argidx+1] argidx += 2 elif garg == '-genvlist': parmdb[('cmdline','-genvlist')] = args[argidx+1].split(',') argidx += 2 elif garg == '-genvnone': parmdb[('cmdline','-genvnone')] = args[argidx+1] argidx += 1 elif garg == '-l': parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1 argidx += 1 elif garg == '-a': parmdb[('cmdline','MPIEXEC_JOB_ALIAS')] = args[argidx+1] argidx += 2 elif garg == '-usize': if args[argidx+1].isdigit(): parmdb[('cmdline','MPIEXEC_USIZE')] = int(args[argidx+1]) else: print 'argument to %s must be numeric' % (garg) usage() argidx += 2 elif garg == '-gdb': parmdb[('cmdline','MPIEXEC_GDB')] = 1 argidx += 1 parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1 # implied parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1 # implied parmdb[('cmdline','MPIEXEC_STDIN_DEST')] = 'all' # implied elif garg == '-ifhn': parmdb[('cmdline','MPIEXEC_IFHN')] = args[argidx+1] argidx += 2 try: hostinfo = socket.gethostbyname_ex(parmdb['MPIEXEC_IFHN']) except: print 'mpiexec: gethostbyname_ex failed for ifhn %s' % (parmdb['MPIEXEC_IFHN']) sys.exit(-1) elif garg == '-m': parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1 argidx += 1 elif garg == '-s': parmdb[('cmdline','MPIEXEC_STDIN_DEST')] = args[argidx+1] argidx += 2 elif garg == '-machinefile': parmdb[('cmdline','MPIEXEC_MACHINEFILE')] = args[argidx+1] argidx += 2 elif garg == '-bnr': parmdb[('cmdline','MPIEXEC_BNR')] = 1 argidx += 1 elif garg == '-tv': parmdb[('cmdline','MPIEXEC_TOTALVIEW')] = 1 argidx += 1 elif garg == '-tvsu': parmdb[('cmdline','MPIEXEC_TOTALVIEW')] = 1 parmdb[('cmdline','MPIEXEC_TVSU')] = 1 argidx += 1 elif garg == '-ecfn': parmdb[('cmdline','MPIEXEC_EXITCODES_FILENAME')] = args[argidx+1] argidx += 2 elif garg == '-1': parmdb[('cmdline','MPIEXEC_TRY_1ST_LOCALLY')] = 0 # reverses meaning argidx += 1 if len(args) <= argidx: print "mpiexec: missing arguments after global args" usage() if args[argidx] == ':': argidx += 1 localArgsKey = 0 # collect local arg sets but do not validate them until handled below while argidx < len(args): if args[argidx] == ':': localArgsKey += 1 localArgSets[localArgsKey] = [] else: localArgSets[localArgsKey].append(args[argidx]) argidx += 1def handle_local_argset(argset,machineFileInfo,msgToMPD): global parmdb, nextRange, appnum validLocalArgs = { '-n' : 1, '-np' : 1, '-host' : 1, '-path' : 1, '-wdir' : 1, '-soft' : 1, '-arch' : 1, '-umask' : 1, '-envall' : 0, '-env' : 2, '-envnone' : 0, '-envlist' : 1 } host = parmdb['-ghost'] wdir = parmdb['-gwdir'] wumask = parmdb['-gumask'] wpath = parmdb['-gpath'] nProcs = parmdb['-gn'] usize = parmdb['MPIEXEC_USIZE'] gexec = parmdb['-gexec'] softness = parmdb['-gsoft'] if parmdb['-genvnone']: envall = 0 else: envall = 1 localEnvlist = [] localEnv = {} argidx = 0 while argidx < len(argset): if argset[argidx] not in validLocalArgs: if argset[argidx][0] == '-': print 'invalid "local" arg: %s' % argset[argidx] usage() break # since now at executable if parmdb['MPIEXEC_MACHINEFILE']: if argset[argidx] == '-host' or argset[argidx] == ['-ghost']: print '-host (or -ghost) and -machinefile are incompatible' sys.exit(-1) if argset[argidx] == '-n' or argset[argidx] == '-np': if len(argset) < (argidx+2): print '** missing arg to -n'
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -