⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpiexec.py

📁 fortran并行计算包
💻 PY
📖 第 1 页 / 共 4 页
字号:
            msgToSend = { 'cmd' : 'stdin_from_user', 'line' : line } # default            if gdbFlag and line.startswith('z'):                line = line.rstrip()                if len(line) < 3:    # just a 'z'                    line += ' 0-%d' % (parmdb['nprocs']-1)                s1 = line[2:].rstrip().split(',')                for s in s1:                    s2 = s.split('-')                    for i in s2:                        if not i.isdigit():                            print 'invalid arg to z :%s:' % i                            continue                msgToSend = { 'cmd' : 'stdin_dest', 'stdin_procs' : line[2:] }                sys.stdout.softspace = 0                print '%s:  (gdb) ' % (line[2:]),            elif gdbFlag and line.startswith('q'):                msgToSend = { 'cmd' : 'stdin_dest',                              'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) }                if manSock:                    manSock.send_dict_msg(msgToSend)                msgToSend = { 'cmd' : 'stdin_from_user','line' : 'q\n' }            elif gdbFlag and line.startswith('^'):                msgToSend = { 'cmd' : 'stdin_dest',                              'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) }                if manSock:                    manSock.send_dict_msg(msgToSend)                msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' }            if manSock:                manSock.send_dict_msg(msgToSend)        else:            streamHandler.del_handler(sys.stdin)            sys.stdin.close()            if manSock:                msgToSend = { 'cmd' : 'stdin_from_user', 'eof' : '' }                manSock.send_dict_msg(msgToSend)    return linedef handle_sig_occurred(manSock):    global sigOccurred    if sigOccurred == signal.SIGINT:        if manSock:            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' }            manSock.send_dict_msg(msgToSend)            manSock.close()        sys.exit(-1)    elif sigOccurred == signal.SIGALRM:        if manSock:            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' }            manSock.send_dict_msg(msgToSend)            manSock.close()        mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%s' % \                  os.environ['MPIEXEC_TIMEOUT'])        sys.exit(-1)    elif sigOccurred == signal.SIGTSTP:        sigOccurred = 0  # do this before kill below        if manSock:            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGTSTP' }            manSock.send_dict_msg(msgToSend)        signal.signal(signal.SIGTSTP,signal.SIG_DFL)      # stop myself        os.kill(os.getpid(),signal.SIGTSTP)        signal.signal(signal.SIGTSTP,sig_handler)  # restore this handler    elif sigOccurred == signal.SIGCONT:        sigOccurred = 0  # do it before handling        if manSock:            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGCONT' }            manSock.send_dict_msg(msgToSend)def sig_handler(signum,frame):    global sigOccurred    sigOccurred = signum    mpd_handle_signal(signum,frame)def format_sorted_ranks(ranks):    all = []    one = []    prevRank = -999    for i in range(len(ranks)):        if i != 0  and  ranks[i] != (prevRank+1):            all.append(one)            one = []        one.append(ranks[i])        if i == (len(ranks)-1):            all.append(one)        prevRank = ranks[i]    pline = ''    for i in range(len(all)):        if len(all[i]) > 1:            pline += '%d-%d' % (all[i][0],all[i][-1])        else:            pline += '%d' % (all[i][0])        if i != (len(all)-1):            pline += ','    return plinedef print_ready_merged_lines(minRanks,parmdb,linesPerRank):    printFlag = 1  # default to get started    while printFlag:        printFlag = 0        for r1 in range(parmdb['nprocs']):            if not linesPerRank[r1]:                continue            sortedRanks = []            lineToPrint = linesPerRank[r1][0]            for r2 in range(parmdb['nprocs']):                if linesPerRank[r2] and linesPerRank[r2][0] == lineToPrint: # myself also                    sortedRanks.append(r2)            if len(sortedRanks) >= minRanks:                fsr = format_sorted_ranks(sortedRanks)                sys.stdout.softspace = 0                print '%s: %s' % (fsr,lineToPrint),                for r2 in sortedRanks:                    linesPerRank[r2] = linesPerRank[r2][1:]                printFlag = 1    sys.stdout.flush()def get_parms_from_xml_file(msgToMPD):    global parmdb    try:        import xml.dom.minidom    except:        print 'you requested to parse an xml file, but'        print '  I was unable to import the xml.dom.minidom module'        sys.exit(-1)    known_rlimit_types = ['core','cpu','fsize','data','stack','rss',                          'nproc','nofile','ofile','memlock','as','vmem']    try:        inXmlFilename = parmdb['inXmlFilename']         parmsXMLFile = open(inXmlFilename,'r')    except:        print 'could not open job xml specification file %s' % (inXmlFilename)        sys.exit(-1)    fileContents = parmsXMLFile.read()    try:        parsedXML = xml.dom.minidom.parseString(fileContents)    except:        print "mpiexec failed parsing xml file (perhaps from mpiexec); here is the content:"        print fileContents        sys.exit(-1)    if parsedXML.documentElement.tagName != 'create-process-group':        print 'expecting create-process-group; got unrecognized doctype: %s' % \              (parsedXML.documentElement.tagName)        sys.exit(-1)    cpg = parsedXML.getElementsByTagName('create-process-group')[0]    if cpg.hasAttribute('totalprocs'):        parmdb[('xml','nprocs')] = int(cpg.getAttribute('totalprocs'))    else:        print '** totalprocs not specified in %s' % inXmlFilename        sys.exit(-1)    if cpg.hasAttribute('try_1st_locally'):        parmdb[('xml','MPIEXEC_TRY_1ST_LOCALLY')] = int(cpg.getAttribute('try_1st_locally'))    if cpg.hasAttribute('output')  and  cpg.getAttribute('output') == 'label':        parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1    if cpg.hasAttribute('pgid'):    # our jobalias        parmdb[('xml','MPIEXEC_JOB_ALIAS')] = cpg.getAttribute('pgid')    if cpg.hasAttribute('stdin_dest'):        parmdb[('xml','MPIEXEC_STDIN_DEST')] = cpg.getAttribute('stdin_dest')    if cpg.hasAttribute('doing_bnr'):        parmdb[('xml','MPIEXEC_BNR')] = int(cpg.getAttribute('doing_bnr'))    if cpg.hasAttribute('ifhn'):        parmdb[('xml','MPIEXEC_IFHN')] = cpg.getAttribute('ifhn')    if cpg.hasAttribute('exit_codes_filename'):        parmdb[('xml','MPIEXEC_EXITCODES_FILENAME')] = cpg.getAttribute('exit_codes_filename')        parmdb[('xml','ecfn_format')] = 'xml'    if cpg.hasAttribute('gdb'):        gdbFlag = int(cpg.getAttribute('gdb'))        if gdbFlag:            parmdb[('xml','MPIEXEC_GDB')]     = 1            parmdb[('xml','MPIEXEC_MERGE_OUTPUT')] = 1       # implied            parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1   # implied            parmdb[('xml','MPIEXEC_STDIN_DEST')]   = 'all'   # implied    if cpg.hasAttribute('use_root_pm'):        parmdb[('xml','MPD_USE_ROOT_MPD')] = int(cpg.getAttribute('use_root_pm'))    if cpg.hasAttribute('tv'):        parmdb[('xml','MPIEXEC_TOTALVIEW')] = int(cpg.getAttribute('tv'))    hostSpec = cpg.getElementsByTagName('host-spec')    if hostSpec:        hostList = []        for node in hostSpec[0].childNodes:            node = node.data.strip()            hostnames = re.findall(r'\S+',node)            for hostname in hostnames:                if hostname:    # some may be the empty string                    try:                        ipaddr = socket.gethostbyname_ex(hostname)[2][0]                    except:                        print 'unable to determine IP info for host %s' % (hostname)                        sys.exit(-1)                    hostList.append(ipaddr)        parmdb[('xml','MPIEXEC_HOST_LIST')] = hostList    if hostSpec and hostSpec[0].hasAttribute('check'):        hostSpecMode = hostSpec[0].getAttribute('check')        if hostSpecMode == 'yes':            parmdb[('xml','MPIEXEC_HOST_CHECK')] = 1    covered = [0] * parmdb['nprocs']     procSpec = cpg.getElementsByTagName('process-spec')    if not procSpec:        print 'No process-spec specified'        usage()    for p in procSpec:        if p.hasAttribute('range'):            therange = p.getAttribute('range')            splitRange = therange.split('-')            if len(splitRange) == 1:                loRange = int(splitRange[0])                hiRange = loRange            else:                (loRange,hiRange) = (int(splitRange[0]),int(splitRange[1]))        else:            (loRange,hiRange) = (0,parmdb['nprocs']-1)        for i in xrange(loRange,hiRange+1):            nprocs = parmdb['nprocs']            if i >= nprocs:                print '*** exiting; rank %d is greater than nprocs' % (nprocs)                sys.exit(-1)            if covered[i]:                print '*** exiting; rank %d is doubly used in proc specs' % (nprocs)                sys.exit(-1)            covered[i] = 1        if p.hasAttribute('exec'):            msgToMPD['execs'][(loRange,hiRange)] = p.getAttribute('exec')        else:            print '*** exiting; range %d-%d has no exec' % (loRange,hiRange)            sys.exit(-1)        if p.hasAttribute('user'):            username = p.getAttribute('user')            if pwd_module_available:                try:                    pwent = pwd.getpwnam(username)                except:                    print username, 'is an invalid username'                    sys.exit(-1)            if username == mpd_get_my_username()  \            or (hasattr(os,'getuid') and os.getuid() == 0):                msgToMPD['users'][(loRange,hiRange)] = p.getAttribute('user')            else:                print username, 'username does not match yours and you are not root'                sys.exit(-1)        else:            msgToMPD['users'][(loRange,hiRange)] = mpd_get_my_username()        if p.hasAttribute('cwd'):            msgToMPD['cwds'][(loRange,hiRange)] = p.getAttribute('cwd')        else:            msgToMPD['cwds'][(loRange,hiRange)] = os.path.abspath(os.getcwd())        if p.hasAttribute('umask'):            msgToMPD['umasks'][(loRange,hiRange)] = p.getAttribute('umask')        else:            currumask = os.umask(0) ; os.umask(currumask)            msgToMPD['umasks'][(loRange,hiRange)] = str(currumask)        if p.hasAttribute('path'):            msgToMPD['paths'][(loRange,hiRange)] = p.getAttribute('path')        else:            msgToMPD['paths'][(loRange,hiRange)] = os.environ['PATH']        if p.hasAttribute('host'):            host = p.getAttribute('host')            if host.startswith('_any_'):                msgToMPD['hosts'][(loRange,hiRange)] = host            else:                try:                    msgToMPD['hosts'][(loRange,hiRange)] = socket.gethostbyname_ex(host)[2][0]                except:                    print 'unable to do find info for host %s' % (host)                    sys.exit(-1)        else:            if hostSpec  and  hostList:                msgToMPD['hosts'][(loRange,hiRange)] = '_any_from_pool_'            else:                msgToMPD['hosts'][(loRange,hiRange)] = '_any_'        argDict = {}        argList = p.getElementsByTagName('arg')        for argElem in argList:            argDict[int(argElem.getAttribute('idx'))] = argElem.getAttribute('value')        argVals = [0] * len(argList)        for i in argDict.keys():            argVals[i-1] = unquote(argDict[i])        msgToMPD['args'][(loRange,hiRange)] = argVals        limitDict = {}        limitList = p.getElementsByTagName('limit')        for limitElem in limitList:            typ = limitElem.getAttribute('type')            if typ in known_rlimit_types:                limitDict[typ] = limitElem.getAttribute('value')            else:                print 'mpiexec: invalid type in limit: %s' % (typ)                sys.exit(-1)        msgToMPD['limits'][(loRange,hiRange)] = limitDict        envVals = {}        envVarList = p.getElementsByTagName('env')        for envVarElem in envVarList:            envkey = envVarElem.getAttribute('name')            envval = unquote(envVarElem.getAttribute('value'))            envVals[envkey] = envval        msgToMPD['envvars'][(loRange,hiRange)] = envVals    for i in range(len(covered)):        if not covered[i]:            print '*** exiting; %d procs are requested, but proc %d is not described' % \                  (parmdb['nprocs'],i)            sys.exit(-1)        def get_vals_for_attach(parmdb,conSock,msgToMPD):    sjobid = parmdb['-gdba'].split('@')    # jobnum and originating host    msgToSend = { 'cmd' : 'mpdlistjobs' }    conSock.send_dict_msg(msgToSend)    msg = conSock.recv_dict_msg(timeout=recvTimeout)    if not msg:        mpd_print(1,'no msg recvd from mpd before timeout')        sys.exit(-1)    if msg['cmd'] != 'local_mpdid':     # get full id of local mpd for filters later        mpd_print(1,'did not recv local_mpdid msg from local mpd; recvd: %s' % msg)        sys.exit(-1)    else:        if len(sjobid) == 1:            sjobid.append(msg['id'])    got_info = 0    while 1:        msg = conSock.recv_dict_msg()        if not msg.has_key('cmd'):            mpd_print(1,'invalid message from mpd :%s:' % (msg))            sys.exit(-1)        if msg['cmd'] == 'mpdlistjobs_info':            got_info = 1            smjobid = msg['jobid'].split('  ')  # jobnum, mpdid, and alias (if present)            if sjobid[0] == smjobid[0]  and  sjobid[1] == smjobid[1]:  # jobnum and mpdid                rank = int(msg['rank'])                msgToMPD['users'][(rank,rank)]   = msg['username']                msgToMPD['hosts'][(rank,rank)]   = msg['ifhn']                msgToMPD['execs'][(rank,rank)]   = msg['pgm']                msgToMPD['cwds'][(rank,rank)]    = os.path.abspath(os.getcwd())                msgToMPD['paths'][(rank,rank)]   = os.environ['PATH']                msgToMPD['args'][(rank,rank)]    = [msg['clipid']]                msgToMPD['envvars'][(rank,rank)] = {}                msgToMPD['limits'][(rank,rank)]  = {}                currumask = os.umask(0) ; os.umask(currumask)  # grab it and set it back                msgToMPD['umasks'][(rank,rank)]  = str(currumask)        elif  msg['cmd'] == 'mpdlistjobs_trailer':            if not got_info:                print 'no info on this jobid; probably invalid'                sys.exit(-1)            break        else:            print 'invaild msg from mpd :%s:' % (msg)            sys.exit(-1)    parmdb[('thispgm','nprocs')] = len(msgToMPD['execs'].keys())  # all dicts are same len    def usage():    print __doc__    sys.exit(-1)if __name__ == '__main__':    try:        mpiexec()    except SystemExit, errExitStatus:  # bounced to here by sys.exit inside mpiexec()        myExitStatus = errExitStatus    sys.exit(myExitStatus)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -