📄 mpdman.py
字号:
self.kvs_next_id += 1 pmiMsgToSend = 'cmd=newkvs kvsname=%s\n' % (new_kvsname) self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'destroy_kvs': kvsname = parsedMsg['kvsname'] try: del self.KVSs[kvsname] pmiMsgToSend = 'cmd=kvs_destroyed rc=0\n' except: pmiMsgToSend = 'cmd=kvs_destroyed rc=-1\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'put': kvsname = parsedMsg['kvsname'] key = parsedMsg['key'] value = parsedMsg['value'] try: self.KVSs[kvsname][key] = value pmiMsgToSend = 'cmd=put_result rc=0\n' self.pmiSock.send_char_msg(pmiMsgToSend) except Exception, errmsg: pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'barrier_in': self.pmiBarrierInRecvd = 1 if self.myRank == 0 or self.holdingPMIBarrierLoop1: msgToSend = { 'cmd' : 'pmi_barrier_loop_1' } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'get': key = parsedMsg['key'] kvsname = parsedMsg['kvsname'] if self.KVSs.has_key(kvsname) and self.KVSs[kvsname].has_key(key): value = self.KVSs[kvsname][key] pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (value) self.pmiSock.send_char_msg(pmiMsgToSend) else: msgToSend = { 'cmd' : 'pmi_get', 'key' : key, 'kvsname' : kvsname, 'from_rank' : self.myRank } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'getbyidx': kvsname = parsedMsg['kvsname'] idx = int(parsedMsg['idx']) if idx == 0: msgToSend = { 'cmd' : 'pmi_getbyidx', 'kvsname' : kvsname, 'from_rank' : self.myRank, 'kvs' : self.KVSs[self.default_kvsname] } self.ring.rhsSock.send_dict_msg(msgToSend) else: if len(self.KVSs[self.default_kvsname].keys()) > idx: key = self.KVSs[self.default_kvsname].keys()[idx] val = self.KVSs[self.default_kvsname][key] nextidx = idx + 1 pmiMsgToSend = 'cmd=getbyidx_results rc=0 nextidx=%d key=%s val=%s\n' % \ (nextidx,key,val) else: pmiMsgToSend = 'cmd=getbyidx_results rc=-2 reason=no_more_keyvals\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'spawn': ## This code really is handling PMI_Spawn_multiple. It translates a ## sequence of separate spawn messages into a single message to send ## to the mpd. It keeps track by the "totspawns" and "spawnssofar" ## parameters in the incoming message. The first message has ## "spawnssofar" set to 1. ## ## This proc may produce stdout and stderr; do this early so I ## won't exit before child sets up its conns with me. ## NOTE: if you spawn a non-MPI job, it may not send these msgs ## in which case adding 2 to numWithIO will cause the pgm to hang. totspawns = int(parsedMsg['totspawns']) spawnssofar = int(parsedMsg['spawnssofar']) if spawnssofar == 1: # this is the first of possibly several spawn msgs self.numWithIO += 2 self.tpsf = 0 # total processes spawned so far self.spawnExecs = {} # part of MPI_Spawn_multiple args self.spawnHosts = {} # comes from info self.spawnUsers = {} # always the current user self.spawnCwds = {} # could come from info, but doesn't yet self.spawnUmasks = {} # could come from info, but doesn't yet self.spawnPaths = {} # could come from info, but doesn't yet self.spawnEnvvars = {} # whole environment from mpiexec, plus appnum self.spawnLimits = {} self.spawnArgs = {} self.spawnNprocs = int(parsedMsg['nprocs']) # num procs in this spawn pmiInfo = {} for i in range(0,int(parsedMsg['info_num'])): info_key = parsedMsg['info_key_%d' % i] info_val = parsedMsg['info_val_%d' % i] pmiInfo[info_key] = info_val if pmiInfo.has_key('host'): try: toIfhn = socket.gethostbyname_ex(pmiInfo['host'])[2][0] self.spawnHosts[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = toIfhn except: mpd_print(1, "unable to obtain host info for :%s:" % (pmiInfo['host'])) pmiMsgToSend = 'cmd=spawn_result rc=-2 status=unknown_host\n' self.pmiSock.send_char_msg(pmiMsgToSend) return else: self.spawnHosts[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = '_any_' if pmiInfo.has_key('path'): self.spawnPaths[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['path'] else: self.spawnPaths[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_CLI_PATH'] if pmiInfo.has_key('wdir'): self.spawnCwds[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['wdir'] else: self.spawnCwds[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_CWD'] if pmiInfo.has_key('umask'): self.spawnUmasks[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['umask'] else: self.spawnUmasks[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_UMASK'] self.spawnExecs[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = parsedMsg['execname'] self.spawnUsers[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = mpd_get_my_username() self.spawnEnv = {} self.spawnEnv.update(os.environ) self.spawnEnv['MPI_APPNUM'] = str(spawnssofar-1) self.spawnEnvvars[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = self.spawnEnv self.spawnLimits[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = {} # not implemented yet ##### args[(tpsf,tpsf+spawnNprocs-1) = [ parsedMsg['args'] ] ##### args[(tpsf,tpsf+spawnNprocs-1) = [ 'AA', 'BB', 'CC' ] cliArgs = [] cliArgcnt = int(parsedMsg['argcnt']) for i in range(1,cliArgcnt+1): # start at 1 cliArgs.append(parsedMsg['arg%d' % i]) self.spawnArgs[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = cliArgs self.tpsf += self.spawnNprocs if totspawns == spawnssofar: # This is the last in the spawn sequence self.spawnedCnt += 1 # non-zero to use for creating kvsname in msg below msgToSend = { 'cmd' : 'spawn', 'conhost' : self.myHost, 'conifhn' : self.myIfhn, 'conport' : self.listenNonRingPort, 'spawned' : self.spawnedCnt, 'jobid' : self.jobid, 'nstarted' : 0, 'nprocs' : self.tpsf, 'hosts' : self.spawnHosts, 'execs' : self.spawnExecs, 'users' : self.spawnUsers, 'cwds' : self.spawnCwds, 'umasks' : self.spawnUmasks, 'paths' : self.spawnPaths, 'args' : self.spawnArgs, 'envvars' : self.spawnEnvvars, 'limits' : self.spawnLimits, 'singinitpid' : 0, 'singinitport' : 0, } msgToSend['line_labels'] = self.lineLabelFmt msgToSend['spawner_manpid'] = os.getpid() self.mpdSock.send_dict_msg(msgToSend) self.spawnInProgress = parsedMsg self.spawnInProgress['errcodes'] = [None] * self.tpsf # one for each spawn # I could send the preput_info along but will keep it here # and let the spawnee call me up and ask for it; he will # call me anyway since I am his parent in the tree. So, I # will create a KVS to hold the info until he calls self.spawnedKVSname = 'mpdman_kvs_for_spawned_' + str(self.spawnedCnt) self.KVSs[self.spawnedKVSname] = {} preput_num = int(parsedMsg['preput_num']) for i in range(0,preput_num): preput_key = parsedMsg['preput_key_%d' % i] preput_val = parsedMsg['preput_val_%d' % i] self.KVSs[self.spawnedKVSname][preput_key] = preput_val elif parsedMsg['cmd'] == 'finalize': # the following lines are present to support a process that runs # 2 MPI pgms in tandem (e.g. mpish at ANL) self.KVSs = {} self.KVSs[self.default_kvsname] = {} self.kvs_next_id = 1 self.jobEndingEarly = 0 self.pmiCollectiveJob = 0 self.spawnedCnt = 0 pmiMsgToSend = 'cmd=finalize_ack\n' self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'client_bnr_fence_in': ## BNR self.pmiBarrierInRecvd = 1 if self.myRank == 0 or self.holdingPMIBarrierLoop1: msgToSend = { 'cmd' : 'pmi_barrier_loop_1' } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'client_bnr_put': ## BNR key = parsedMsg['attr'] value = parsedMsg['val'] try: self.KVSs[self.default_kvsname][key] = value pmiMsgToSend = 'cmd=put_result rc=0\n' self.pmiSock.send_char_msg(pmiMsgToSend) except Exception, errmsg: pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg self.pmiSock.send_char_msg(pmiMsgToSend) elif parsedMsg['cmd'] == 'client_bnr_get': ## BNR key = parsedMsg['attr'] if self.KVSs[self.default_kvsname].has_key(key): value = self.KVSs[self.default_kvsname][key] pmiMsgToSend = 'cmd=client_bnr_get_output rc=0 val=%s\n' % (value) self.pmiSock.send_char_msg(pmiMsgToSend) else: msgToSend = { 'cmd' : 'bnr_get', 'key' : key, 'kvsname' : kvsname, 'from_rank' : self.myRank } self.ring.rhsSock.send_dict_msg(msgToSend) elif parsedMsg['cmd'] == 'client_ready': ## BNR ## continue to wait for accepting_signals pass elif parsedMsg['cmd'] == 'accepting_signals': ## BNR ## handle it like a barrier_in ?? self.pmiBarrierInRecvd = 1 self.doingBNR = 1 ## BNR # set again is OK elif parsedMsg['cmd'] == 'interrupt_peer_with_msg': ## BNR self.ring.rhsSock.send_dict_msg(parsedMsg) else: mpd_print(1, "unrecognized pmi msg :%s:" % line ) def handle_console_input(self,sock): msg = self.conSock.recv_dict_msg() if not msg: if self.conSock: self.streamHandler.del_handler(self.conSock) self.conSock.close() self.conSock = 0 if self.parentStdoutSock: self.streamHandler.del_handler(self.parentStdoutSock) self.parentStdoutSock.close() self.parentStdoutSock = 0 if self.parentStderrSock: self.streamHandler.del_handler(self.parentStderrSock) self.parentStderrSock.close() self.parentStderrSock = 0 if self.ring.rhsSock: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' } self.ring.rhsSock.send_dict_msg(msgToSend) try: pgrp = clientPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler except: pass elif msg['cmd'] == 'signal': if msg['signo'] == 'SIGINT': self.ring.rhsSock.send_dict_msg(msg) for s in self.spawnedChildSocks: s.send_dict_msg(msg) if self.gdb: os.kill(clientPid,signal.SIGINT) else: try: pgrp = clientPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler except: pass elif msg['signo'] == 'SIGKILL': try: self.ring.rhsSock.send_dict_msg(msg) except: pass for s in self.spawnedChildSocks: try:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -