📄 mpdman.py
字号:
#!/usr/bin/env python## (C) 2001 by Argonne National Laboratory.# See COPYRIGHT in top-level directory.#"""mpdman does NOT run as a standalone console program; it is only exec'd (or imported) by mpd"""from time import ctime__author__ = "Ralph Butler and Rusty Lusk"__date__ = ctime()__version__ = "$Revision: 1.160 $"__credits__ = ""import sys, os, signal, socketfrom types import ClassTypefrom re import findall, subfrom cPickle import loadsfrom time import sleepfrom urllib import quotefrom mpdlib import mpd_set_my_id, mpd_print, mpd_read_nbytes, \ mpd_sockpair, mpd_get_ranks_in_binary_tree, \ mpd_get_my_username, mpd_set_cli_app, \ mpd_dbg_level, mpd_handle_signal, \ MPDSock, MPDListenSock, MPDStreamHandler, MPDRingtry: import syslog syslog_module_available = 1except: syslog_module_available = 0try: import subprocess subprocess_module_available = 1except: subprocess_module_available = 0global clientPid, clientExited, clientExitStatus, clientExitStatusSentclass MPDMan(object): def __init__(self): pass def run(self): global clientPid, clientExited, clientExitStatus, clientExitStatusSent clientExited = 0 clientExitStatusSent = 0 if hasattr(signal,'SIGCHLD'): signal.signal(signal.SIGCHLD,sigchld_handler) self.myHost = os.environ['MPDMAN_MYHOST'] self.myIfhn = os.environ['MPDMAN_MYIFHN'] self.myRank = int(os.environ['MPDMAN_RANK']) self.posInRing = int(os.environ['MPDMAN_POS_IN_RING']) self.myId = self.myHost + '_mpdman_' + str(self.myRank) self.spawned = int(os.environ['MPDMAN_SPAWNED']) self.spawnInProgress = 0 if self.spawned: self.myId = self.myId + '_s' # Note that in the spawned process case, this id for the mpdman # will not be unique (it needs something like the world number # or the pid of the mpdman process itself) mpd_set_my_id(myid=self.myId) self.clientPgm = os.environ['MPDMAN_CLI_PGM'] mpd_set_cli_app(self.clientPgm) try: os.chdir(os.environ['MPDMAN_CWD']) except Exception, errmsg: errmsg = '%s: invalid dir: %s' % (self.myId,os.environ['MPDMAN_CWD']) # print errmsg ## may syslog it in some cases ? if os.environ['MPDMAN_HOW_LAUNCHED'] == 'FORK': self.listenRingPort = int(os.environ['MPDMAN_MY_LISTEN_PORT']) listenRingFD = int(os.environ['MPDMAN_MY_LISTEN_FD']) # closed in loop below self.listenRingSock = socket.fromfd(listenRingFD,socket.AF_INET,socket.SOCK_STREAM) self.listenRingSock = MPDSock(sock=self.listenRingSock) mpdFD = int(os.environ['MPDMAN_TO_MPD_FD']) # closed in loop below self.mpdSock = socket.fromfd(mpdFD,socket.AF_INET,socket.SOCK_STREAM) self.mpdSock = MPDSock(sock=self.mpdSock) elif os.environ['MPDMAN_HOW_LAUNCHED'] == 'SUBPROCESS': self.listenRingSock = MPDListenSock() self.listenRingPort = self.listenRingSock.getsockname()[1] mpdPort = int(os.environ['MPDMAN_MPD_PORT']) self.mpdSock = MPDSock() self.mpdSock.connect((self.myIfhn,mpdPort)) self.mpdSock.send_dict_msg( {'man_listen_port' : self.listenRingPort} ) else: mpd_print(1,'I cannot figure out how I was launched') sys.exit(-1) self.pos0Ifhn = os.environ['MPDMAN_POS0_IFHN'] self.pos0Port = int(os.environ['MPDMAN_POS0_PORT']) # close unused fds before I grab any more # NOTE: this will also close syslog's fd inherited from mpd; re-opened below try: max_fds = os.sysconf('SC_OPEN_MAX') except: max_fds = 1024 for fd in range(3,max_fds): if fd == self.mpdSock.fileno() or fd == self.listenRingSock.fileno(): continue try: os.close(fd) except: pass if syslog_module_available: syslog.openlog("mpdman",0,syslog.LOG_DAEMON) syslog.syslog(syslog.LOG_INFO,"mpdman starting new log; %s" % (self.myId) ) self.umask = os.environ['MPDMAN_UMASK'] if self.umask.startswith('0x'): self.umask = int(self.umask,16) elif self.umask.startswith('0'): self.umask = int(self.umask,8) else: self.umask = int(self.umask) self.oldumask = os.umask(self.umask) self.clientPgmArgs = loads(os.environ['MPDMAN_PGM_ARGS']) self.clientPgmEnv = loads(os.environ['MPDMAN_PGM_ENVVARS']) self.clientPgmLimits = loads(os.environ['MPDMAN_PGM_LIMITS']) self.jobid = os.environ['MPDMAN_JOBID'] self.nprocs = int(os.environ['MPDMAN_NPROCS']) self.mpdPort = int(os.environ['MPDMAN_MPD_LISTEN_PORT']) self.mpdConfPasswd = os.environ['MPDMAN_MPD_CONF_SECRETWORD'] os.environ['MPDMAN_MPD_CONF_SECRETWORD'] = '' ## do NOT pass it on to clients self.kvs_template_from_env = os.environ['MPDMAN_KVS_TEMPLATE'] self.conIfhn = os.environ['MPDMAN_CONIFHN'] self.conPort = int(os.environ['MPDMAN_CONPORT']) self.lhsIfhn = os.environ['MPDMAN_LHS_IFHN'] self.lhsPort = int(os.environ['MPDMAN_LHS_PORT']) self.stdinDest = os.environ['MPDMAN_STDIN_DEST'] self.totalview = int(os.environ['MPDMAN_TOTALVIEW']) self.gdb = int(os.environ['MPDMAN_GDB']) self.gdba = os.environ['MPDMAN_GDBA'] self.lineLabelFmt = os.environ['MPDMAN_LINE_LABELS_FMT'] self.startStdoutLineLabel = 1 self.startStderrLineLabel = 1 self.singinitPID = int(os.environ['MPDMAN_SINGINIT_PID']) self.singinitPORT = int(os.environ['MPDMAN_SINGINIT_PORT']) self.doingBNR = int(os.environ['MPDMAN_DOING_BNR']) self.listenNonRingSock = MPDListenSock('',0,name='nonring_listen_sock') self.listenNonRingPort = self.listenNonRingSock.getsockname()[1] self.streamHandler = MPDStreamHandler() self.streamHandler.set_handler(self.mpdSock,self.handle_mpd_input) self.streamHandler.set_handler(self.listenNonRingSock, self.handle_nonring_connection) # set up pmi stuff early in case I was spawned self.universeSize = -1 self.appnum = -1 self.pmiVersion = 1 self.pmiSubversion = 1 self.KVSs = {} if self.singinitPID: # self.kvsname_template = 'singinit_kvs_' self.kvsname_template = 'singinit_kvs_' + str(os.getpid()) else: self.kvsname_template = 'kvs_' + self.kvs_template_from_env + '_' self.default_kvsname = self.kvsname_template + '0' self.default_kvsname = sub('\.','_',self.default_kvsname) # magpie.cs to magpie_cs self.default_kvsname = sub('\-','_',self.default_kvsname) # chg node-0 to node_0 self.KVSs[self.default_kvsname] = {} cli_env = {} cli_env['MPICH_INTERFACE_HOSTNAME'] = os.environ['MPICH_INTERFACE_HOSTNAME'] cli_env['MPICH_INTERFACE_HOSTNAME_R%d' % self.myRank] = os.environ['MPICH_INTERFACE_HOSTNAME'] for k in self.clientPgmEnv.keys(): if k.startswith('MPI_APPNUM'): self.appnum = self.clientPgmEnv[k] # don't put in application env elif k.startswith('MPICH_INTERFACE_HOSTNAME'): continue ## already put it in above else: cli_env[k] = self.clientPgmEnv[k] self.kvs_next_id = 1 self.jobEndingEarly = 0 self.pmiCollectiveJob = 0 self.spawnedCnt = 0 self.pmiSock = 0 # obtained later self.ring = MPDRing(listenSock=self.listenRingSock, streamHandler=self.streamHandler, myIfhn=self.myIfhn) if self.nprocs == 1: self.ring.create_single_mem_ring(ifhn=self.myIfhn, port=self.listenRingPort, lhsHandler=self.handle_lhs_input, rhsHandler=self.handle_rhs_input) else: if self.posInRing == 0: # one 'end' self.ring.accept_rhs(rhsHandler=self.handle_rhs_input) self.ring.accept_lhs(lhsHandler=self.handle_lhs_input) elif self.posInRing == (self.nprocs-1): # the other 'end' rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn, lhsPort=self.lhsPort, lhsHandler=self.handle_lhs_input, numTries=8) if rv[0] <= 0: mpd_print(1,"lhs connect failed") sys.exit(-1) self.rhsIfhn = self.pos0Ifhn self.rhsPort = self.pos0Port rv = self.ring.connect_rhs(rhsIfhn=self.rhsIfhn, rhsPort=self.rhsPort, rhsHandler=self.handle_rhs_input, numTries=8) if rv[0] <= 0: # connect did not succeed; may try again mpd_print(1,"rhs connect failed") sys.exit(-1) else: # ring members 'in the middle' rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn, lhsPort=self.lhsPort, lhsHandler=self.handle_lhs_input, numTries=8) if rv[0] <= 0: mpd_print(1,"lhs connect failed") sys.exit(-1) self.ring.accept_rhs(rhsHandler=self.handle_rhs_input) if self.myRank == 0: self.conSock = MPDSock(name='to_console') self.conSock.connect((self.conIfhn,self.conPort)) self.streamHandler.set_handler(self.conSock,self.handle_console_input) if self.spawned: msgToSend = { 'cmd' : 'spawned_man0_is_up', 'spawned_id' : os.environ['MPDMAN_SPAWNED'] } self.conSock.send_dict_msg(msgToSend) msg = self.conSock.recv_dict_msg() # If there is a failure in the connection, this # receive will fail and if not handled, cause mpdman # to fail. For now, we just check on a empty or unexpected # message if not msg or msg['cmd'] != 'preput_info_for_child': mpd_print(1,'invalid msg from parent :%s:' % msg) sys.exit(-1) try: for k in msg['kvs'].keys(): self.KVSs[self.default_kvsname][k] = msg['kvs'][k] except: mpd_print(1,'failed to insert preput_info') sys.exit(-1) msg = self.conSock.recv_dict_msg() if not msg or not msg.has_key('cmd') or msg['cmd'] != 'ringsize': mpd_print(1,'spawned: bad msg from con; got: %s' % (msg) ) sys.exit(-1) self.universeSize = msg['ring_ncpus'] # if the rshSock is closed, we'll get an AttributeError # exception about 'int' has no attribute 'send_dict_msg' # FIXME: Does every use of a sock on which send_dict_msg # is used need an "if xxxx.rhsSock:" test first? # Is there an else for those cases? self.ring.rhsSock.send_dict_msg(msg) # forward it on else: msgToSend = { 'cmd' : 'man_checking_in' } self.conSock.send_dict_msg(msgToSend) msg = self.conSock.recv_dict_msg() if not msg or not msg.has_key('cmd') or msg['cmd'] != 'ringsize': mpd_print(1,'invalid msg from con; expected ringsize got: %s' % (msg) ) sys.exit(-1) if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'): self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE']) else: self.universeSize = msg['ring_ncpus'] self.ring.rhsSock.send_dict_msg(msg)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -