📄 mpdman.py
字号:
line = line + '\n' self.parentStderrSock.send_char_msg(line,errprint=0) return line def handle_child_stdout_tree_input(self,sock): if self.lineLabelFmt: line = sock.recv_one_line() else: line = sock.recv(1024) if not line: self.streamHandler.del_handler(sock) sock.close() self.numDone += 1 if self.numDone >= self.numWithIO: if self.parentStdoutSock: self.parentStdoutSock.close() self.parentStdoutSock = 0 if self.parentStderrSock: self.parentStderrSock.close() self.parentStderrSock = 0 else: if self.parentStdoutSock: self.parentStdoutSock.send_char_msg(line,errprint=0) # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) ) def handle_child_stderr_tree_input(self,sock): if self.lineLabelFmt: line = sock.recv_one_line() else: line = sock.recv(1024) if not line: self.streamHandler.del_handler(sock) sock.close() self.numDone += 1 if self.numDone >= self.numWithIO: if self.parentStdoutSock: self.parentStdoutSock.close() self.parentStdoutSock = 0 if self.parentStderrSock: self.parentStderrSock.close() self.parentStderrSock = 0 else: if self.parentStderrSock: self.parentStderrSock.send_char_msg(line,errprint=0) # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) ) def handle_spawned_child_input(self,sock): msg = sock.recv_dict_msg() if not msg: self.streamHandler.del_handler(sock) self.spawnedChildSocks.remove(sock) sock.close() elif msg['cmd'] == 'job_started' or msg['cmd'] == 'job_terminated': pass elif msg['cmd'] == 'client_exit_status': if self.myRank == 0: if self.conSock: self.conSock.send_dict_msg(msg,errprint=0) else: if self.ring.rhsSock: self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'job_aborted_early': if self.conSock: msgToSend = { 'cmd' : 'job_aborted_early', 'jobid' : msg['jobid'], 'rank' : msg['rank'], 'exit_status' : msg['exit_status'] } self.conSock.send_dict_msg(msgToSend,errprint=0) elif msg['cmd'] == 'execution_problem': msgToSend = { 'cmd' : 'execution_problem', 'src' : self.myId, 'jobid' : self.jobid, 'rank' : self.myRank, 'exec' : msg['exec'], 'reason' : msg['reason'] } self.ring.rhsSock.send_dict_msg(msgToSend) if self.conSock: self.conSock.send_dict_msg(msgToSend,errprint=0) else: mpd_print(1, "unrecognized msg from spawned child :%s:" % msg ) def handle_pmi_connection(self,sock): if self.pmiSock: # already have one pmiMsgToSend = 'cmd=you_already_have_an_open_pmi_conn_to_me\n' self.pmiSock.send_char_msg(pmiMsgToSend) self.streamHandler.del_handler(self.pmiSock) self.pmiSock.close() self.pmiSock = 0 errmsg = "mpdman: invalid attempt by client (%s) " % (self.clientPgm) + \ "to open 2 simultaneous pmi connections" print errmsg ; sys.stdout.flush() clientExitStatus = 137 # assume kill -9 below msgToSend = { 'cmd' : 'collective_abort', 'src' : self.myId, 'rank' : self.myRank, 'exit_status' : clientExitStatus } self.ring.rhsSock.send_dict_msg(msgToSend) return (self.pmiSock,tempConnAddr) = self.pmiListenSock.accept() # the following lines are commented out so that we can support a process # that runs 2 MPI pgms in tandem (e.g. mpish at ANL) ##### del socksToSelect[pmiListenSock] ##### pmiListenSock.close() if not self.pmiSock: mpd_print(1,"failed accept for pmi connection from client") sys.exit(-1) self.pmiSock.name = 'pmi' self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) if self.tvReady: pmiMsgToSend = 'cmd=tv_ready\n' self.pmiSock.send_char_msg(pmiMsgToSend) def handle_pmi_input(self,sock): global clientPid, clientExited, clientExitStatus, clientExitStatusSent line = self.pmiSock.recv_char_msg() if not line: self.streamHandler.del_handler(self.pmiSock) self.pmiSock.close() self.pmiSock = 0 if self.pmiCollectiveJob: if self.ring.rhsSock: # still alive ? if not self.jobEndingEarly: # if I did not already know this if not clientExited: clientExitStatus = 137 # assume kill -9 below msgToSend = { 'cmd' : 'collective_abort', 'src' : self.myId, 'rank' : self.myRank, 'exit_status' : clientExitStatus } self.ring.rhsSock.send_dict_msg(msgToSend) try: pgrp = clientPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler except: pass return if line.startswith('mcmd='): parsedMsg = {} line = line.rstrip() splitLine = line.split('=',1) parsedMsg['cmd'] = splitLine[1] line = '' while not line.startswith('endcmd'): line = self.pmiSock.recv_char_msg() if not line.startswith('endcmd'): line = line.rstrip() splitLine = line.split('=',1) parsedMsg[splitLine[0]] = splitLine[1] else: parsedMsg = parse_pmi_msg(line) if not parsedMsg.has_key('cmd'): mpd_print(1, "unrecognized pmi msg (no cmd) :%s:" % line ) return # execution_problem is sent BEFORE client actually starts if parsedMsg['cmd'] == 'execution_problem': msgToSend = { 'cmd' : 'execution_problem', 'src' : self.myId, 'jobid' : self.jobid, 'rank' : self.myRank, 'exec' : parsedMsg['exec'], 'reason' : parsedMsg['reason'] } self.ring.rhsSock.send_dict_msg(msgToSend) if self.conSock: self.conSock.send_dict_msg(msgToSend,errprint=0) elif parsedMsg['cmd'] == 'init': self.pmiCollectiveJob = 1 version = int(parsedMsg['pmi_version']) subversion = int(parsedMsg['pmi_subversion']) if self.pmiVersion == version and self.pmiSubversion >= subversion: rc = 0 else: rc = -1 pmiMsgToSend = 'cmd=response_to_init pmi_version=%d pmi_subversion=%d rc=%d\n' % \ (self.pmiVersion,self.pmiSubversion,rc) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_my_kvsname': pmiMsgToSend = 'cmd=my_kvsname kvsname=%s\n' % (self.default_kvsname) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_maxes': pmiMsgToSend = 'cmd=maxes kvsname_max=4096 ' + \ 'keylen_max=4096 vallen_max=4096\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_universe_size': pmiMsgToSend = 'cmd=universe_size size=%s\n' % (self.universeSize) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'get_appnum': pmiMsgToSend = 'cmd=appnum appnum=%s\n' % (self.appnum) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'publish_name': msgToSend = { 'cmd' : 'publish_name', 'service' : parsedMsg['service'], 'port' : parsedMsg['port'], 'jobid' : self.jobid, 'manpid' : os.getpid() } self.mpdSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'unpublish_name': msgToSend = { 'cmd' : 'unpublish_name', 'service' : parsedMsg['service'], 'jobid' : self.jobid, 'manpid' : os.getpid() } self.mpdSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'lookup_name': msgToSend = { 'cmd' : 'lookup_name', 'service' : parsedMsg['service'], 'jobid' : self.jobid, 'manpid' : os.getpid() } self.mpdSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'create_kvs': new_kvsname = self.kvsname_template + str(self.kvs_next_id) self.KVSs[new_kvsname] = {} self.kvs_next_id += 1 pmiMsgToSend = 'cmd=newkvs kvsname=%s\n' % (new_kvsname) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'destroy_kvs': kvsname = parsedMsg['kvsname'] try: del self.KVSs[kvsname] pmiMsgToSend = 'cmd=kvs_destroyed rc=0\n' except: pmiMsgToSend = 'cmd=kvs_destroyed rc=-1\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'put': kvsname = parsedMsg['kvsname'] key = parsedMsg['key'] value = parsedMsg['value'] try: self.KVSs[kvsname][key] = value pmiMsgToSend = 'cmd=put_result rc=0\n' self.pmiSock.send_char_msg(pmiMsgToSend) except Exception, errmsg: pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'barrier_in': self.pmiBarrierInRecvd = 1 if self.myRank == 0 or self.holdingPMIBarrierLoop1: msgToSend = { 'cmd' : 'pmi_barrier_loop_1' } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'get': key = parsedMsg['key'] kvsname = parsedMsg['kvsname'] if self.KVSs.has_key(kvsname) and self.KVSs[kvsname].has_key(key): value = self.KVSs[kvsname][key] pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (value) self.pmiSock.send_char_msg(pmiMsgToSend) else: msgToSend = { 'cmd' : 'pmi_get', 'key' : key, 'kvsname' : kvsname, 'from_rank' : self.myRank } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'getbyidx': kvsname = parsedMsg['kvsname'] idx = int(parsedMsg['idx']) if idx == 0: msgToSend = { 'cmd' : 'pmi_getbyidx', 'kvsname' : kvsname, 'from_rank' : self.myRank, 'kvs' : self.KVSs[self.default_kvsname] } self.ring.rhsSock.send_dict_msg(msgToSend) else: if len(self.KVSs[self.default_kvsname].keys()) > idx: key = self.KVSs[self.default_kvsname].keys()[idx] val = self.KVSs[self.default_kvsname][key] nextidx = idx + 1 pmiMsgToSend = 'cmd=getbyidx_results rc=0 nextidx=%d key=%s val=%s\n' % \ (nextidx,key,val) else: pmiMsgToSend = 'cmd=getbyidx_results rc=-2 reason=no_more_keyvals\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'spawn': ## This code really is handling PMI_Spawn_multiple. It translates a ## sequence of separate spawn messages into a single message to send ## to the mpd. It keeps track by the "totspawns" and "spawnssofar" ## parameters in the incoming message. The first message has ## "spawnssofar" set to 1. ## ## This proc may produce stdout and stderr; do this early so I ## won't exit before child sets up its conns with me.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -