📄 mpdman.py
字号:
else: line = '' if splitLine[-1] == '': self.startStdoutLineLabel = 1 del splitLine[-1] else: self.startStdoutLineLabel = 0 for s in splitLine[0:-1]: line = line + s + '\n' + lineLabel line = line + splitLine[-1] if self.startStdoutLineLabel: line = line + '\n' self.parentStdoutSock.send_char_msg(line,errprint=0) return line def handle_cli_stderr_input(self,sock): line = mpd_read_nbytes(sock,1024) # sock is self.fd_read_cli_stderr if not line: if self.subproc: # must close subproc's file (not just the fd) self.subproc.stderr.close() else: self.streamHandler.del_handler(self.fd_read_cli_stderr) os.close(self.fd_read_cli_stderr) self.numDone += 1 if self.numDone >= self.numWithIO: if self.parentStdoutSock: self.parentStdoutSock.close() self.parentStdoutSock = 0 if self.parentStderrSock: self.parentStderrSock.close() self.parentStderrSock = 0 else: if self.parentStderrSock: if self.lineLabelFmt: lineLabel = self.create_line_label(self.lineLabelFmt,self.spawned) splitLine = line.split('\n',1024) if self.startStderrLineLabel: line = lineLabel else: line = '' if splitLine[-1] == '': self.startStderrLineLabel = 1 del splitLine[-1] else: self.startStderrLineLabel = 0 for s in splitLine[0:-1]: line = line + s + '\n' + lineLabel line = line + splitLine[-1] if self.startStderrLineLabel: line = line + '\n' self.parentStderrSock.send_char_msg(line,errprint=0) return line def handle_child_stdout_tree_input(self,sock): if self.lineLabelFmt: line = sock.recv_one_line() else: line = sock.recv(1024) if not line: self.streamHandler.del_handler(sock) sock.close() self.numDone += 1 if self.numDone >= self.numWithIO: if self.parentStdoutSock: self.parentStdoutSock.close() self.parentStdoutSock = 0 if self.parentStderrSock: self.parentStderrSock.close() self.parentStderrSock = 0 else: if self.parentStdoutSock: self.parentStdoutSock.send_char_msg(line,errprint=0) # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) ) def handle_child_stderr_tree_input(self,sock): if self.lineLabelFmt: line = sock.recv_one_line() else: line = sock.recv(1024) if not line: self.streamHandler.del_handler(sock) sock.close() self.numDone += 1 if self.numDone >= self.numWithIO: if self.parentStdoutSock: self.parentStdoutSock.close() self.parentStdoutSock = 0 if self.parentStderrSock: self.parentStderrSock.close() self.parentStderrSock = 0 else: if self.parentStderrSock: self.parentStderrSock.send_char_msg(line,errprint=0) # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) ) def handle_spawned_child_input(self,sock): msg = sock.recv_dict_msg() if not msg: self.streamHandler.del_handler(sock) self.spawnedChildSocks.remove(sock) sock.close() elif msg['cmd'] == 'job_started': pass elif msg['cmd'] == 'client_exit_status': if self.myRank == 0: if self.conSock: self.conSock.send_dict_msg(msg,errprint=0) else: if self.ring.rhsSock: self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'job_aborted_early': if self.conSock: msgToSend = { 'cmd' : 'job_aborted_early', 'jobid' : msg['jobid'], 'rank' : msg['rank'], 'exit_status' : msg['exit_status'] } self.conSock.send_dict_msg(msgToSend,errprint=0) elif msg['cmd'] == 'startup_status': # remember this rc to put in spawn_result self.spawnInProgress['errcodes'][msg['rank']] = msg['rc'] if None not in self.spawnInProgress['errcodes']: # if all errcodes are now filled in # send pmi msg to spawner strerrcodes = '' # put errcodes in str format for pmi msg for ec in self.spawnInProgress['errcodes']: strerrcodes = strerrcodes + str(ec) + ',' strerrcodes = strerrcodes[:-1] if self.pmiSock: # may have disappeared in early shutdown # may want to make rc < 0 if any errcode is < 0 pmiMsgToSend = 'cmd=spawn_result rc=0 errcodes=%s\n' % (strerrcodes) self.pmiSock.send_char_msg(pmiMsgToSend) self.spawnInProgress = 0 else: mpd_print(1, "unrecognized msg from spawned child :%s:" % msg ) def handle_pmi_connection(self,sock): if self.pmiSock: # already have one pmiMsgToSend = 'cmd=you_already_have_an_open_pmi_conn_to_me\n' self.pmiSock.send_char_msg(pmiMsgToSend) self.streamHandler.del_handler(self.pmiSock) self.pmiSock.close() self.pmiSock = 0 errmsg = "mpdman: invalid attempt to open 2 simultaneous pmi connections\n" + \ " client=%s cwd=%s" % (self.clientPgm,os.environ['MPDMAN_CWD']) print errmsg ; sys.stdout.flush() clientExitStatus = 137 # assume kill -9 below msgToSend = { 'cmd' : 'collective_abort', 'src' : self.myId, 'rank' : self.myRank, 'exit_status' : clientExitStatus } self.ring.rhsSock.send_dict_msg(msgToSend) return (self.pmiSock,tempConnAddr) = self.pmiListenSock.accept() # the following lines are commented out so that we can support a process # that runs 2 MPI pgms in tandem (e.g. mpish at ANL) ##### del socksToSelect[pmiListenSock] ##### pmiListenSock.close() if not self.pmiSock: mpd_print(1,"failed accept for pmi connection from client") sys.exit(-1) self.pmiSock.name = 'pmi' self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) if self.tvReady: pmiMsgToSend = 'cmd=tv_ready\n' self.pmiSock.send_char_msg(pmiMsgToSend) def handle_pmi_input(self,sock): global clientPid, clientExited, clientExitStatus, clientExitStatusSent if self.spawnInProgress: return line = self.pmiSock.recv_char_msg() if not line: self.streamHandler.del_handler(self.pmiSock) self.pmiSock.close() self.pmiSock = 0 if self.pmiCollectiveJob: if self.ring.rhsSock: # still alive ? if not self.jobEndingEarly: # if I did not already know this if not clientExited: clientExitStatus = 137 # assume kill -9 below msgToSend = { 'cmd' : 'collective_abort', 'src' : self.myId, 'rank' : self.myRank, 'exit_status' : clientExitStatus } self.ring.rhsSock.send_dict_msg(msgToSend) try: pgrp = clientPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler except: pass return if line.startswith('mcmd='): parsedMsg = {} line = line.rstrip() splitLine = line.split('=',1) parsedMsg['cmd'] = splitLine[1] line = '' while not line.startswith('endcmd'): line = self.pmiSock.recv_char_msg() if not line.startswith('endcmd'): line = line.rstrip() splitLine = line.split('=',1) parsedMsg[splitLine[0]] = splitLine[1] else: parsedMsg = parse_pmi_msg(line) if not parsedMsg.has_key('cmd'): pmiMsgToSend = 'cmd=unparseable_msg rc=-1\n' self.pmiSock.send_char_msg(pmiMsgToSend) return # startup_status may be sent here from new process BEFORE starting client if parsedMsg['cmd'] == 'startup_status': msgToSend = { 'cmd' : 'startup_status', 'src' : self.myId, 'rc' : parsedMsg['rc'], 'jobid' : self.jobid, 'rank' : self.myRank, 'exec' : parsedMsg['exec'], 'reason' : parsedMsg['reason'] } if self.ring.rhsSock: self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'init': self.pmiCollectiveJob = 1 version = int(parsedMsg['pmi_version']) subversion = int(parsedMsg['pmi_subversion']) if self.pmiVersion == version and self.pmiSubversion >= subversion: rc = 0 else: rc = -1 pmiMsgToSend = 'cmd=response_to_init pmi_version=%d pmi_subversion=%d rc=%d\n' % \ (self.pmiVersion,self.pmiSubversion,rc) self.pmiSock.send_char_msg(pmiMsgToSend) msgToSend = { 'cmd' : 'startup_status', 'src' : self.myId, 'rc' : 0, 'jobid' : self.jobid, 'rank' : self.myRank, 'exec' : '', 'reason' : '' } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'get_my_kvsname': pmiMsgToSend = 'cmd=my_kvsname kvsname=%s\n' % (self.default_kvsname) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_maxes': pmiMsgToSend = 'cmd=maxes kvsname_max=4096 ' + \ 'keylen_max=4096 vallen_max=4096\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_universe_size': pmiMsgToSend = 'cmd=universe_size size=%s\n' % (self.universeSize) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_appnum': pmiMsgToSend = 'cmd=appnum appnum=%s\n' % (self.appnum) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'publish_name': msgToSend = { 'cmd' : 'publish_name', 'service' : parsedMsg['service'], 'port' : parsedMsg['port'], 'jobid' : self.jobid, 'manpid' : os.getpid() } self.mpdSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'unpublish_name': msgToSend = { 'cmd' : 'unpublish_name', 'service' : parsedMsg['service'], 'jobid' : self.jobid, 'manpid' : os.getpid() } self.mpdSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'lookup_name': msgToSend = { 'cmd' : 'lookup_name', 'service' : parsedMsg['service'], 'jobid' : self.jobid, 'manpid' : os.getpid() } self.mpdSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'create_kvs': new_kvsname = self.kvsname_template + str(self.kvs_next_id) self.KVSs[new_kvsname] = {}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -