📄 mpiexec.py
字号:
msgToSend = { 'cmd' : 'stdin_from_user', 'line' : line } # default if gdbFlag and line.startswith('z'): line = line.rstrip() if len(line) < 3: # just a 'z' line += ' 0-%d' % (parmdb['nprocs']-1) s1 = line[2:].rstrip().split(',') for s in s1: s2 = s.split('-') for i in s2: if not i.isdigit(): print 'invalid arg to z :%s:' % i continue msgToSend = { 'cmd' : 'stdin_dest', 'stdin_procs' : line[2:] } sys.stdout.softspace = 0 print '%s: (gdb) ' % (line[2:]), elif gdbFlag and line.startswith('q'): msgToSend = { 'cmd' : 'stdin_dest', 'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) } if manSock: manSock.send_dict_msg(msgToSend) msgToSend = { 'cmd' : 'stdin_from_user','line' : 'q\n' } elif gdbFlag and line.startswith('^'): msgToSend = { 'cmd' : 'stdin_dest', 'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) } if manSock: manSock.send_dict_msg(msgToSend) msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' } if manSock: manSock.send_dict_msg(msgToSend) else: streamHandler.del_handler(sys.stdin) sys.stdin.close() if manSock: msgToSend = { 'cmd' : 'stdin_from_user', 'eof' : '' } manSock.send_dict_msg(msgToSend) return linedef handle_sig_occurred(manSock): global sigOccurred if sigOccurred == signal.SIGINT: if manSock: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' } manSock.send_dict_msg(msgToSend) manSock.close() sys.exit(-1) elif sigOccurred == signal.SIGALRM: if manSock: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' } manSock.send_dict_msg(msgToSend) manSock.close() mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%s' % \ os.environ['MPIEXEC_TIMEOUT']) sys.exit(-1) elif sigOccurred == signal.SIGTSTP: sigOccurred = 0 # do this before kill below if manSock: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGTSTP' } manSock.send_dict_msg(msgToSend) signal.signal(signal.SIGTSTP,signal.SIG_DFL) # stop myself os.kill(os.getpid(),signal.SIGTSTP) signal.signal(signal.SIGTSTP,sig_handler) # restore this handler elif sigOccurred == signal.SIGCONT: sigOccurred = 0 # do it before handling if manSock: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGCONT' } manSock.send_dict_msg(msgToSend)def sig_handler(signum,frame): global sigOccurred sigOccurred = signum mpd_handle_signal(signum,frame)def format_sorted_ranks(ranks): all = [] one = [] prevRank = -999 for i in range(len(ranks)): if i != 0 and ranks[i] != (prevRank+1): all.append(one) one = [] one.append(ranks[i]) if i == (len(ranks)-1): all.append(one) prevRank = ranks[i] pline = '' for i in range(len(all)): if len(all[i]) > 1: pline += '%d-%d' % (all[i][0],all[i][-1]) else: pline += '%d' % (all[i][0]) if i != (len(all)-1): pline += ',' return plinedef print_ready_merged_lines(minRanks,parmdb,linesPerRank): printFlag = 1 # default to get started while printFlag: printFlag = 0 for r1 in range(parmdb['nprocs']): if not linesPerRank[r1]: continue sortedRanks = [] lineToPrint = linesPerRank[r1][0] for r2 in range(parmdb['nprocs']): if linesPerRank[r2] and linesPerRank[r2][0] == lineToPrint: # myself also sortedRanks.append(r2) if len(sortedRanks) >= minRanks: fsr = format_sorted_ranks(sortedRanks) sys.stdout.softspace = 0 print '%s: %s' % (fsr,lineToPrint), for r2 in sortedRanks: linesPerRank[r2] = linesPerRank[r2][1:] printFlag = 1 sys.stdout.flush()def get_parms_from_xml_file(msgToMPD): global parmdb try: import xml.dom.minidom except: print 'you requested to parse an xml file, but' print ' I was unable to import the xml.dom.minidom module' sys.exit(-1) known_rlimit_types = ['core','cpu','fsize','data','stack','rss', 'nproc','nofile','ofile','memlock','as','vmem'] try: inXmlFilename = parmdb['inXmlFilename'] parmsXMLFile = open(inXmlFilename,'r') except: print 'could not open job xml specification file %s' % (inXmlFilename) sys.exit(-1) fileContents = parmsXMLFile.read() try: parsedXML = xml.dom.minidom.parseString(fileContents) except: print "mpiexec failed parsing xml file (perhaps from mpiexec); here is the content:" print fileContents sys.exit(-1) if parsedXML.documentElement.tagName != 'create-process-group': print 'expecting create-process-group; got unrecognized doctype: %s' % \ (parsedXML.documentElement.tagName) sys.exit(-1) cpg = parsedXML.getElementsByTagName('create-process-group')[0] if cpg.hasAttribute('totalprocs'): parmdb[('xml','nprocs')] = int(cpg.getAttribute('totalprocs')) else: print '** totalprocs not specified in %s' % inXmlFilename sys.exit(-1) if cpg.hasAttribute('try_1st_locally'): parmdb[('xml','MPIEXEC_TRY_1ST_LOCALLY')] = int(cpg.getAttribute('try_1st_locally')) if cpg.hasAttribute('output') and cpg.getAttribute('output') == 'label': parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1 if cpg.hasAttribute('pgid'): # our jobalias parmdb[('xml','MPIEXEC_JOB_ALIAS')] = cpg.getAttribute('pgid') if cpg.hasAttribute('stdin_dest'): parmdb[('xml','MPIEXEC_STDIN_DEST')] = cpg.getAttribute('stdin_dest') if cpg.hasAttribute('doing_bnr'): parmdb[('xml','MPIEXEC_BNR')] = int(cpg.getAttribute('doing_bnr')) if cpg.hasAttribute('ifhn'): parmdb[('xml','MPIEXEC_IFHN')] = cpg.getAttribute('ifhn') if cpg.hasAttribute('exit_codes_filename'): parmdb[('xml','MPIEXEC_EXITCODES_FILENAME')] = cpg.getAttribute('exit_codes_filename') parmdb[('xml','ecfn_format')] = 'xml' if cpg.hasAttribute('gdb'): gdbFlag = int(cpg.getAttribute('gdb')) if gdbFlag: parmdb[('xml','MPIEXEC_GDB')] = 1 parmdb[('xml','MPIEXEC_MERGE_OUTPUT')] = 1 # implied parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1 # implied parmdb[('xml','MPIEXEC_STDIN_DEST')] = 'all' # implied if cpg.hasAttribute('use_root_pm'): parmdb[('xml','MPD_USE_ROOT_MPD')] = int(cpg.getAttribute('use_root_pm')) if cpg.hasAttribute('tv'): parmdb[('xml','MPIEXEC_TOTALVIEW')] = int(cpg.getAttribute('tv')) hostSpec = cpg.getElementsByTagName('host-spec') if hostSpec: hostList = [] for node in hostSpec[0].childNodes: node = node.data.strip() hostnames = re.findall(r'\S+',node) for hostname in hostnames: if hostname: # some may be the empty string try: ipaddr = socket.gethostbyname_ex(hostname)[2][0] except: print 'unable to determine IP info for host %s' % (hostname) sys.exit(-1) hostList.append(ipaddr) parmdb[('xml','MPIEXEC_HOST_LIST')] = hostList if hostSpec and hostSpec[0].hasAttribute('check'): hostSpecMode = hostSpec[0].getAttribute('check') if hostSpecMode == 'yes': parmdb[('xml','MPIEXEC_HOST_CHECK')] = 1 covered = [0] * parmdb['nprocs'] procSpec = cpg.getElementsByTagName('process-spec') if not procSpec: print 'No process-spec specified' usage() for p in procSpec: if p.hasAttribute('range'): therange = p.getAttribute('range') splitRange = therange.split('-') if len(splitRange) == 1: loRange = int(splitRange[0]) hiRange = loRange else: (loRange,hiRange) = (int(splitRange[0]),int(splitRange[1])) else: (loRange,hiRange) = (0,parmdb['nprocs']-1) for i in xrange(loRange,hiRange+1): nprocs = parmdb['nprocs'] if i >= nprocs: print '*** exiting; rank %d is greater than nprocs' % (nprocs) sys.exit(-1) if covered[i]: print '*** exiting; rank %d is doubly used in proc specs' % (nprocs) sys.exit(-1) covered[i] = 1 if p.hasAttribute('exec'): msgToMPD['execs'][(loRange,hiRange)] = p.getAttribute('exec') else: print '*** exiting; range %d-%d has no exec' % (loRange,hiRange) sys.exit(-1) if p.hasAttribute('user'): username = p.getAttribute('user') if pwd_module_available: try: pwent = pwd.getpwnam(username) except: print username, 'is an invalid username' sys.exit(-1) if username == mpd_get_my_username() \ or (hasattr(os,'getuid') and os.getuid() == 0): msgToMPD['users'][(loRange,hiRange)] = p.getAttribute('user') else: print username, 'username does not match yours and you are not root' sys.exit(-1) else: msgToMPD['users'][(loRange,hiRange)] = mpd_get_my_username() if p.hasAttribute('cwd'): msgToMPD['cwds'][(loRange,hiRange)] = p.getAttribute('cwd') else: msgToMPD['cwds'][(loRange,hiRange)] = os.path.abspath(os.getcwd()) if p.hasAttribute('umask'): msgToMPD['umasks'][(loRange,hiRange)] = p.getAttribute('umask') else: currumask = os.umask(0) ; os.umask(currumask) msgToMPD['umasks'][(loRange,hiRange)] = str(currumask) if p.hasAttribute('path'): msgToMPD['paths'][(loRange,hiRange)] = p.getAttribute('path') else: msgToMPD['paths'][(loRange,hiRange)] = os.environ['PATH'] if p.hasAttribute('host'): host = p.getAttribute('host') if host.startswith('_any_'): msgToMPD['hosts'][(loRange,hiRange)] = host else: try: msgToMPD['hosts'][(loRange,hiRange)] = socket.gethostbyname_ex(host)[2][0] except: print 'unable to do find info for host %s' % (host) sys.exit(-1) else: if hostSpec and hostList: msgToMPD['hosts'][(loRange,hiRange)] = '_any_from_pool_' else: msgToMPD['hosts'][(loRange,hiRange)] = '_any_' argDict = {} argList = p.getElementsByTagName('arg') for argElem in argList: argDict[int(argElem.getAttribute('idx'))] = argElem.getAttribute('value') argVals = [0] * len(argList) for i in argDict.keys(): argVals[i-1] = unquote(argDict[i]) msgToMPD['args'][(loRange,hiRange)] = argVals limitDict = {} limitList = p.getElementsByTagName('limit') for limitElem in limitList: typ = limitElem.getAttribute('type') if typ in known_rlimit_types: limitDict[typ] = limitElem.getAttribute('value') else: print 'mpiexec: invalid type in limit: %s' % (typ) sys.exit(-1) msgToMPD['limits'][(loRange,hiRange)] = limitDict envVals = {} envVarList = p.getElementsByTagName('env') for envVarElem in envVarList: envkey = envVarElem.getAttribute('name') envval = unquote(envVarElem.getAttribute('value')) envVals[envkey] = envval msgToMPD['envvars'][(loRange,hiRange)] = envVals for i in range(len(covered)): if not covered[i]: print '*** exiting; %d procs are requested, but proc %d is not described' % \ (parmdb['nprocs'],i) sys.exit(-1) def get_vals_for_attach(parmdb,conSock,msgToMPD): sjobid = parmdb['-gdba'].split('@') # jobnum and originating host msgToSend = { 'cmd' : 'mpdlistjobs' } conSock.send_dict_msg(msgToSend) msg = conSock.recv_dict_msg(timeout=recvTimeout) if not msg: mpd_print(1,'no msg recvd from mpd before timeout') sys.exit(-1) if msg['cmd'] != 'local_mpdid': # get full id of local mpd for filters later mpd_print(1,'did not recv local_mpdid msg from local mpd; recvd: %s' % msg) sys.exit(-1) else: if len(sjobid) == 1: sjobid.append(msg['id']) got_info = 0 while 1: msg = conSock.recv_dict_msg() if not msg.has_key('cmd'): mpd_print(1,'invalid message from mpd :%s:' % (msg)) sys.exit(-1) if msg['cmd'] == 'mpdlistjobs_info': got_info = 1 smjobid = msg['jobid'].split(' ') # jobnum, mpdid, and alias (if present) if sjobid[0] == smjobid[0] and sjobid[1] == smjobid[1]: # jobnum and mpdid rank = int(msg['rank']) msgToMPD['users'][(rank,rank)] = msg['username'] msgToMPD['hosts'][(rank,rank)] = msg['ifhn'] msgToMPD['execs'][(rank,rank)] = msg['pgm'] msgToMPD['cwds'][(rank,rank)] = os.path.abspath(os.getcwd()) msgToMPD['paths'][(rank,rank)] = os.environ['PATH'] msgToMPD['args'][(rank,rank)] = [msg['clipid']] msgToMPD['envvars'][(rank,rank)] = {} msgToMPD['limits'][(rank,rank)] = {} currumask = os.umask(0) ; os.umask(currumask) # grab it and set it back msgToMPD['umasks'][(rank,rank)] = str(currumask) elif msg['cmd'] == 'mpdlistjobs_trailer': if not got_info: print 'no info on this jobid; probably invalid' sys.exit(-1) break else: print 'invaild msg from mpd :%s:' % (msg) sys.exit(-1) parmdb[('thispgm','nprocs')] = len(msgToMPD['execs'].keys()) # all dicts are same len def usage(): print __doc__ sys.exit(-1)if __name__ == '__main__': try: mpiexec() except SystemExit, errExitStatus: # bounced to here by sys.exit inside mpiexec() myExitStatus = errExitStatus sys.exit(myExitStatus)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -