⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpdman.py

📁 fortran并行计算包
💻 PY
📖 第 1 页 / 共 5 页
字号:
#!/usr/bin/env python##   (C) 2001 by Argonne National Laboratory.#       See COPYRIGHT in top-level directory.#"""mpdman does NOT run as a standalone console program;    it is only exec'd (or imported) by mpd"""from time import ctime__author__ = "Ralph Butler and Rusty Lusk"__date__ = ctime()__version__ = "$Revision: 1.160 $"__credits__ = ""import sys, os, signal, socketfrom types    import ClassTypefrom re       import findall, subfrom cPickle  import loadsfrom time     import sleepfrom urllib   import quotefrom mpdlib   import mpd_set_my_id, mpd_print, mpd_read_nbytes,  \                     mpd_sockpair, mpd_get_ranks_in_binary_tree, \                     mpd_get_my_username, mpd_set_cli_app,       \                     mpd_dbg_level, mpd_handle_signal,           \                     MPDSock, MPDListenSock, MPDStreamHandler, MPDRingtry:    import  syslog      syslog_module_available = 1except:    syslog_module_available = 0try:    import  subprocess    subprocess_module_available = 1except:    subprocess_module_available = 0global clientPid, clientExited, clientExitStatus, clientExitStatusSentclass MPDMan(object):    def __init__(self):        pass    def run(self):        global clientPid, clientExited, clientExitStatus, clientExitStatusSent        clientExited = 0        clientExitStatusSent = 0        if hasattr(signal,'SIGCHLD'):            signal.signal(signal.SIGCHLD,sigchld_handler)        self.myHost    = os.environ['MPDMAN_MYHOST']        self.myIfhn    = os.environ['MPDMAN_MYIFHN']        self.myRank    = int(os.environ['MPDMAN_RANK'])        self.posInRing = int(os.environ['MPDMAN_POS_IN_RING'])        self.myId      = self.myHost + '_mpdman_' + str(self.myRank)        self.spawned   = int(os.environ['MPDMAN_SPAWNED'])        self.spawnInProgress = 0        if self.spawned:            self.myId = self.myId + '_s'        # Note that in the spawned process case, this id for the mpdman        # will not be unique (it needs something like the world number        # or the pid of the mpdman process itself)        mpd_set_my_id(myid=self.myId)        self.clientPgm = os.environ['MPDMAN_CLI_PGM']        mpd_set_cli_app(self.clientPgm)        try:            os.chdir(os.environ['MPDMAN_CWD'])        except Exception, errmsg:            errmsg =  '%s: invalid dir: %s' % (self.myId,os.environ['MPDMAN_CWD'])            # print errmsg    ## may syslog it in some cases ?        if os.environ['MPDMAN_HOW_LAUNCHED'] == 'FORK':            self.listenRingPort = int(os.environ['MPDMAN_MY_LISTEN_PORT'])            listenRingFD = int(os.environ['MPDMAN_MY_LISTEN_FD'])  # closed in loop below            self.listenRingSock = socket.fromfd(listenRingFD,socket.AF_INET,socket.SOCK_STREAM)            self.listenRingSock = MPDSock(sock=self.listenRingSock)            mpdFD = int(os.environ['MPDMAN_TO_MPD_FD'])  # closed in loop below            self.mpdSock = socket.fromfd(mpdFD,socket.AF_INET,socket.SOCK_STREAM)            self.mpdSock = MPDSock(sock=self.mpdSock)        elif os.environ['MPDMAN_HOW_LAUNCHED'] == 'SUBPROCESS':            self.listenRingSock = MPDListenSock()            self.listenRingPort = self.listenRingSock.getsockname()[1]            mpdPort = int(os.environ['MPDMAN_MPD_PORT'])            self.mpdSock = MPDSock()            self.mpdSock.connect((self.myIfhn,mpdPort))            self.mpdSock.send_dict_msg( {'man_listen_port' : self.listenRingPort} )        else:            mpd_print(1,'I cannot figure out how I was launched')            sys.exit(-1)        self.pos0Ifhn = os.environ['MPDMAN_POS0_IFHN']        self.pos0Port = int(os.environ['MPDMAN_POS0_PORT'])        # close unused fds before I grab any more        # NOTE: this will also close syslog's fd inherited from mpd; re-opened below        try:     max_fds = os.sysconf('SC_OPEN_MAX')        except:  max_fds = 1024        for fd in range(3,max_fds):            if fd == self.mpdSock.fileno()  or  fd == self.listenRingSock.fileno():                continue            try:    os.close(fd)            except: pass        if syslog_module_available:            syslog.openlog("mpdman",0,syslog.LOG_DAEMON)            syslog.syslog(syslog.LOG_INFO,"mpdman starting new log; %s" % (self.myId) )        self.umask = os.environ['MPDMAN_UMASK']        if self.umask.startswith('0x'):            self.umask = int(self.umask,16)        elif self.umask.startswith('0'):            self.umask = int(self.umask,8)        else:            self.umask = int(self.umask)        self.oldumask = os.umask(self.umask)        self.clientPgmArgs = loads(os.environ['MPDMAN_PGM_ARGS'])        self.clientPgmEnv = loads(os.environ['MPDMAN_PGM_ENVVARS'])        self.clientPgmLimits = loads(os.environ['MPDMAN_PGM_LIMITS'])        self.jobid = os.environ['MPDMAN_JOBID']        self.nprocs = int(os.environ['MPDMAN_NPROCS'])        self.mpdPort = int(os.environ['MPDMAN_MPD_LISTEN_PORT'])        self.mpdConfPasswd = os.environ['MPDMAN_MPD_CONF_SECRETWORD']        os.environ['MPDMAN_MPD_CONF_SECRETWORD'] = ''  ## do NOT pass it on to clients        self.kvs_template_from_env = os.environ['MPDMAN_KVS_TEMPLATE']        self.conIfhn  = os.environ['MPDMAN_CONIFHN']        self.conPort  = int(os.environ['MPDMAN_CONPORT'])        self.lhsIfhn  = os.environ['MPDMAN_LHS_IFHN']        self.lhsPort  = int(os.environ['MPDMAN_LHS_PORT'])        self.stdinDest = os.environ['MPDMAN_STDIN_DEST']        self.totalview = int(os.environ['MPDMAN_TOTALVIEW'])        self.gdb = int(os.environ['MPDMAN_GDB'])        self.gdba = os.environ['MPDMAN_GDBA']        self.lineLabelFmt = os.environ['MPDMAN_LINE_LABELS_FMT']        self.startStdoutLineLabel = 1        self.startStderrLineLabel = 1        self.singinitPID  = int(os.environ['MPDMAN_SINGINIT_PID'])        self.singinitPORT = int(os.environ['MPDMAN_SINGINIT_PORT'])        self.doingBNR = int(os.environ['MPDMAN_DOING_BNR'])        self.listenNonRingSock = MPDListenSock('',0,name='nonring_listen_sock')        self.listenNonRingPort = self.listenNonRingSock.getsockname()[1]        self.streamHandler = MPDStreamHandler()        self.streamHandler.set_handler(self.mpdSock,self.handle_mpd_input)        self.streamHandler.set_handler(self.listenNonRingSock,                                       self.handle_nonring_connection)        # set up pmi stuff early in case I was spawned        self.universeSize = -1        self.appnum = -1        self.pmiVersion = 1        self.pmiSubversion = 1        self.KVSs = {}        if self.singinitPID:            # self.kvsname_template = 'singinit_kvs_'            self.kvsname_template = 'singinit_kvs_' + str(os.getpid())        else:            self.kvsname_template = 'kvs_' + self.kvs_template_from_env + '_'        self.default_kvsname = self.kvsname_template + '0'        self.default_kvsname = sub('\.','_',self.default_kvsname)  # magpie.cs to magpie_cs        self.default_kvsname = sub('\-','_',self.default_kvsname)  # chg node-0 to node_0        self.KVSs[self.default_kvsname] = {}        cli_env = {}        cli_env['MPICH_INTERFACE_HOSTNAME'] = os.environ['MPICH_INTERFACE_HOSTNAME']        cli_env['MPICH_INTERFACE_HOSTNAME_R%d' % self.myRank] = os.environ['MPICH_INTERFACE_HOSTNAME']        for k in self.clientPgmEnv.keys():            if k.startswith('MPI_APPNUM'):                self.appnum = self.clientPgmEnv[k]    # don't put in application env            elif k.startswith('MPICH_INTERFACE_HOSTNAME'):                continue    ## already put it in above            else:                cli_env[k] = self.clientPgmEnv[k]        self.kvs_next_id = 1        self.jobEndingEarly = 0        self.pmiCollectiveJob = 0        self.spawnedCnt = 0        self.pmiSock = 0   # obtained later        self.ring = MPDRing(listenSock=self.listenRingSock,                            streamHandler=self.streamHandler,                            myIfhn=self.myIfhn)        if self.nprocs == 1:            self.ring.create_single_mem_ring(ifhn=self.myIfhn,                                             port=self.listenRingPort,                                             lhsHandler=self.handle_lhs_input,                                             rhsHandler=self.handle_rhs_input)        else:            if self.posInRing == 0:    # one 'end'                self.ring.accept_rhs(rhsHandler=self.handle_rhs_input)                self.ring.accept_lhs(lhsHandler=self.handle_lhs_input)            elif self.posInRing == (self.nprocs-1):  # the other 'end'                rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn,                                           lhsPort=self.lhsPort,                                           lhsHandler=self.handle_lhs_input,                                           numTries=8)                if rv[0] <= 0:                    mpd_print(1,"lhs connect failed")                    sys.exit(-1)                self.rhsIfhn = self.pos0Ifhn                self.rhsPort = self.pos0Port                rv = self.ring.connect_rhs(rhsIfhn=self.rhsIfhn,                                           rhsPort=self.rhsPort,                                           rhsHandler=self.handle_rhs_input,                                           numTries=8)                if rv[0] <=  0:  # connect did not succeed; may try again                    mpd_print(1,"rhs connect failed")                    sys.exit(-1)            else:  # ring members 'in the middle'                rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn,                                           lhsPort=self.lhsPort,                                           lhsHandler=self.handle_lhs_input,                                           numTries=8)                if rv[0] <= 0:                    mpd_print(1,"lhs connect failed")                    sys.exit(-1)                self.ring.accept_rhs(rhsHandler=self.handle_rhs_input)        if self.myRank == 0:            self.conSock = MPDSock(name='to_console')            self.conSock.connect((self.conIfhn,self.conPort))            self.streamHandler.set_handler(self.conSock,self.handle_console_input)            if self.spawned:                msgToSend = { 'cmd' : 'spawned_man0_is_up',                              'spawned_id' : os.environ['MPDMAN_SPAWNED'] }                self.conSock.send_dict_msg(msgToSend)                msg = self.conSock.recv_dict_msg()                # If there is a failure in the connection, this                # receive will fail and if not handled, cause mpdman                 # to fail.  For now, we just check on a empty or unexpected                # message                if not msg or msg['cmd'] != 'preput_info_for_child':                    mpd_print(1,'invalid msg from parent :%s:' % msg)                    sys.exit(-1)                try:                    for k in msg['kvs'].keys():                        self.KVSs[self.default_kvsname][k] = msg['kvs'][k]                except:                    mpd_print(1,'failed to insert preput_info')                    sys.exit(-1)                msg = self.conSock.recv_dict_msg()                if not msg  or  not msg.has_key('cmd')  or  msg['cmd'] != 'ringsize':                    mpd_print(1,'spawned: bad msg from con; got: %s' % (msg) )                    sys.exit(-1)                self.universeSize = msg['ring_ncpus']                # if the rshSock is closed, we'll get an AttributeError                 # exception about 'int' has no attribute 'send_dict_msg'                # FIXME: Does every use of a sock on which send_dict_msg                # is used need an "if xxxx.rhsSock:" test first?                # Is there an else for those cases?                self.ring.rhsSock.send_dict_msg(msg)  # forward it on            else:                msgToSend = { 'cmd' : 'man_checking_in' }                self.conSock.send_dict_msg(msgToSend)                msg = self.conSock.recv_dict_msg()                if not msg  or  not msg.has_key('cmd')  or  msg['cmd'] != 'ringsize':                    mpd_print(1,'invalid msg from con; expected ringsize got: %s' % (msg) )                    sys.exit(-1)                if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'):                    self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE'])                else:                    self.universeSize = msg['ring_ncpus']                self.ring.rhsSock.send_dict_msg(msg)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -