📄 mpd.py
字号:
msg['cmd'] = 'pmi_unpublish_name' # add pmi_ msg['src'] = self.myId self.ring.rhsSock.send_dict_msg(msg) else: mpd_print(1, 'INVALID request from man msg=:%s:' % (msg) ) msgToSend = { 'cmd' : 'invalid_request' } sock.send_dict_msg(msgToSend) def handle_lhs_input(self,sock): msg = self.ring.lhsSock.recv_dict_msg() if not msg: # lost lhs; don't worry mpd_print(0, "CLOSING self.ring.lhsSock ", self.ring.lhsSock ) self.streamHandler.del_handler(self.ring.lhsSock) self.ring.lhsSock.close() self.ring.lhsSock = 0 return if msg['cmd'] == 'mpdrun' or msg['cmd'] == 'spawn': if msg.has_key('mpdid_mpdrun_start') \ and msg['mpdid_mpdrun_start'] == self.myId: if msg['first_loop']: self.currRingSize = msg['ringsize'] self.currRingNCPUs = msg['ring_ncpus'] if msg['nstarted'] == msg['nprocs']: if msg['cmd'] == 'spawn': self.spawnInProgress = 0 if self.conSock: msgToSend = { 'cmd' : 'mpdrun_ack', 'ringsize' : self.currRingSize, 'ring_ncpus' : self.currRingNCPUs} self.conSock.send_dict_msg(msgToSend) return if not msg['first_loop'] and msg['nstarted_on_this_loop'] == 0: if msg.has_key('jobid'): if msg['cmd'] == 'mpdrun': msgToSend = { 'cmd' : 'abortjob', 'src' : self.myId, 'jobid' : msg['jobid'], 'reason' : 'some_procs_not_started' } self.ring.rhsSock.send_dict_msg(msgToSend) else: # spawn msgToSend = { 'cmd' : 'startup_status', 'rc' : -1, 'reason' : 'some_procs_not_started' } jobid = msg['jobid'] manPid = msg['spawner_manpid'] manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg(msgToSend) if self.conSock: msgToSend = { 'cmd' : 'job_failed', 'reason' : 'some_procs_not_started', 'remaining_hosts' : msg['hosts'] } self.conSock.send_dict_msg(msgToSend) return msg['first_loop'] = 0 msg['nstarted_on_this_loop'] = 0 self.do_mpdrun(msg) elif msg['cmd'] == 'mpdtrace_info': if msg['dest'] == self.myId: if self.conSock: self.conSock.send_dict_msg(msg) else: self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'mpdtrace_trailer': if msg['dest'] == self.myId: if self.conSock: self.conSock.send_dict_msg(msg) else: msgToSend = { 'cmd' : 'mpdtrace_info', 'dest' : msg['dest'], 'id' : self.myId, 'ifhn' : self.myIfhn, 'lhsport' : '%s' % (self.ring.lhsPort), 'lhsifhn' : '%s' % (self.ring.lhsIfhn), 'rhsport' : '%s' % (self.ring.rhsPort), 'rhsifhn' : '%s' % (self.ring.rhsIfhn) } self.ring.rhsSock.send_dict_msg(msgToSend) self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'mpdlistjobs_info': if msg['dest'] == self.myId: if self.conSock: self.conSock.send_dict_msg(msg) else: self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'mpdlistjobs_trailer': if msg['dest'] == self.myId: if self.conSock: self.conSock.send_dict_msg(msg) else: for jobid in self.activeJobs.keys(): for manPid in self.activeJobs[jobid]: msgToSend = { 'cmd' : 'mpdlistjobs_info', 'dest' : msg['dest'], 'jobid' : jobid, 'username' : self.activeJobs[jobid][manPid]['username'], 'host' : self.myHost, 'ifhn' : self.myIfhn, 'clipid' : str(self.activeJobs[jobid][manPid]['clipid']), 'sid' : str(manPid), # may chg to actual sid later 'pgm' : self.activeJobs[jobid][manPid]['pgm'], 'rank' : self.activeJobs[jobid][manPid]['rank'] } self.ring.rhsSock.send_dict_msg(msgToSend) self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'mpdallexit': if self.allExiting: # already seen this once self.exiting = 1 # set flag to exit main loop self.allExiting = 1 self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'mpdexit': if msg['dest'] == self.myId: msg['done'] = 1 # do this first if msg['src'] == self.myId: # may be src and dest if self.conSock: if msg['done']: self.conSock.send_dict_msg({'cmd' : 'mpdexit_ack'}) else: self.conSock.send_dict_msg({'cmd' : 'mpdexit_failed'}) else: self.ring.rhsSock.send_dict_msg(msg) if msg['dest'] == self.myId: self.exiting = 1 self.ring.lhsSock.send_dict_msg( { 'cmd' : 'mpdexiting', 'rhsifhn' : self.ring.rhsIfhn, 'rhsport' : self.ring.rhsPort }) elif msg['cmd'] == 'mpdringtest': if msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) else: numLoops = msg['numloops'] - 1 if numLoops > 0: msg['numloops'] = numLoops self.ring.rhsSock.send_dict_msg(msg) else: if self.conSock: # may have closed it if user did ^C at console self.conSock.send_dict_msg({'cmd' : 'mpdringtest_done' }) elif msg['cmd'] == 'mpdsigjob': forwarded = 0 if msg['handled'] and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) forwarded = 1 handledHere = 0 for jobid in self.activeJobs.keys(): sjobid = jobid.split(' ') # jobnum and mpdid if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ or (msg['jobalias'] and sjobid[2] == msg['jobalias']): for manPid in self.activeJobs[jobid].keys(): if self.activeJobs[jobid][manPid]['username'] == msg['username'] \ or msg['username'] == 'root': manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg( { 'cmd' : 'signal_to_handle', 's_or_g' : msg['s_or_g'], 'sigtype' : msg['sigtype'] } ) handledHere = 1 if handledHere: msg['handled'] = 1 if not forwarded and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) if msg['src'] == self.myId: if self.conSock: self.conSock.send_dict_msg( {'cmd' : 'mpdsigjob_ack', 'handled' : msg['handled'] } ) elif msg['cmd'] == 'mpdkilljob': forwarded = 0 if msg['handled'] and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) forwarded = 1 handledHere = 0 for jobid in self.activeJobs.keys(): sjobid = jobid.split(' ') # jobnum and mpdid if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ or (msg['jobalias'] and sjobid[2] == msg['jobalias']): for manPid in self.activeJobs[jobid].keys(): if self.activeJobs[jobid][manPid]['username'] == msg['username'] \ or msg['username'] == 'root': try: pgrp = manPid * (-1) # neg manPid -> group os.kill(pgrp,signal.SIGKILL) cliPid = self.activeJobs[jobid][manPid]['clipid'] pgrp = cliPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # neg Pid -> group handledHere = 1 except: pass # del self.activeJobs[jobid] ## handled when child goes away if handledHere: msg['handled'] = 1 if not forwarded and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) if msg['src'] == self.myId: if self.conSock: self.conSock.send_dict_msg( {'cmd' : 'mpdkilljob_ack', 'handled' : msg['handled'] } ) elif msg['cmd'] == 'abortjob': if msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) for jobid in self.activeJobs.keys(): if jobid == msg['jobid']: for manPid in self.activeJobs[jobid].keys(): manSocket = self.activeJobs[jobid][manPid]['socktoman'] if manSocket: manSocket.send_dict_msg(msg) sleep(0.5) # give man a brief chance to deal with this try: pgrp = manPid * (-1) # neg manPid -> group os.kill(pgrp,signal.SIGKILL) cliPid = self.activeJobs[jobid][manPid]['clipid'] pgrp = cliPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # neg Pid -> group except: pass # del self.activeJobs[jobid] ## handled when child goes away elif msg['cmd'] == 'pulse': self.ring.lhsSock.send_dict_msg({'cmd':'pulse_ack'}) elif msg['cmd'] == 'verify_hosts_in_ring': while self.myIfhn in msg['host_list'] or self.myHost in msg['host_list']: if self.myIfhn in msg['host_list']: msg['host_list'].remove(self.myIfhn) elif self.myHost in msg['host_list']: msg['host_list'].remove(self.myHost) if msg['dest'] == self.myId: msgToSend = { 'cmd' : 'verify_hosts_in_ring_response', 'host_list' : msg['host_list'] } self.conSock.send_dict_msg(msgToSend) else: self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'pmi_lookup_name': if msg['src'] == self.myId: if msg.has_key('port') and msg['port'] != 0: msgToSend = msg msgToSend['cmd'] = 'lookup_result' msgToSend['info'] = 'ok' else: msgToSend = { 'cmd' : 'lookup_result', 'info' : 'unknown_service', 'port' : 0} jobid = msg['jobid'] manPid = msg['manpid'] manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg(msgToSend) else: if self.pmi_published_names.has_key(msg['service']): msg['port'] = self.pmi_published_names[msg['service']] self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'pmi_unpublish_name': if msg['src'] == self.myId: if msg.has_key('done'): msgToSend = msg msgToSend['cmd'] = 'unpublish_result' msgToSend['info'] = 'ok' else: msgToSend = { 'cmd' : 'unpublish_result', 'info' : 'unknown_service' } jobid = msg['jobid'] manPid = msg['manpid'] manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg(msgToSend) else: if self.pmi_published_names.has_key(msg['service']): del self.pmi_published_names[msg['service']] msg['done'] = 1 self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'client_info': if msg['spawner_manpid'] and msg['rank'] == 0: if msg['spawner_mpd'] == self.myId: jobid = msg['jobid'] spawnerManPid = msg['spawner_manpid'] if self.activeJobs[jobid].has_key(spawnerManPid): spawnerManSock = self.activeJobs[jobid][spawnerManPid]['socktoman'] msgToSend = { 'cmd' : 'spawn_done_by_mpd', 'rc' : 0, 'reason' : '' } spawnerManSock.send_dict_msg(msgToSend) else: self.ring.rhsSock.send_dict_msg(msg) else:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -