📄 mpd.py
字号:
msg['done'] = 1 # do this first if msg['src'] == self.myId: # may be src and dest if self.conSock: if msg['done']: self.conSock.send_dict_msg({'cmd' : 'mpdexit_ack'}) else: self.conSock.send_dict_msg({'cmd' : 'mpdexit_failed'}) else: self.ring.rhsSock.send_dict_msg(msg) if msg['dest'] == self.myId: self.exiting = 1 self.ring.lhsSock.send_dict_msg( { 'cmd' : 'mpdexiting', 'rhsifhn' : self.ring.rhsIfhn, 'rhsport' : self.ring.rhsPort }) elif msg['cmd'] == 'mpdringtest': if msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) else: numLoops = msg['numloops'] - 1 if numLoops > 0: msg['numloops'] = numLoops self.ring.rhsSock.send_dict_msg(msg) else: if self.conSock: # may have closed it if user did ^C at console self.conSock.send_dict_msg({'cmd' : 'mpdringtest_done' }) elif msg['cmd'] == 'mpdsigjob': forwarded = 0 if msg['handled'] and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) forwarded = 1 handledHere = 0 for jobid in self.activeJobs.keys(): sjobid = jobid.split(' ') # jobnum and mpdid if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ or (msg['jobalias'] and sjobid[2] == msg['jobalias']): for manPid in self.activeJobs[jobid].keys(): if self.activeJobs[jobid][manPid]['username'] == msg['username'] \ or msg['username'] == 'root': manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg( { 'cmd' : 'signal_to_handle', 's_or_g' : msg['s_or_g'], 'sigtype' : msg['sigtype'] } ) handledHere = 1 if handledHere: msg['handled'] = 1 if not forwarded and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) if msg['src'] == self.myId: if self.conSock: self.conSock.send_dict_msg( {'cmd' : 'mpdsigjob_ack', 'handled' : msg['handled'] } ) elif msg['cmd'] == 'mpdkilljob': forwarded = 0 if msg['handled'] and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) forwarded = 1 handledHere = 0 for jobid in self.activeJobs.keys(): sjobid = jobid.split(' ') # jobnum and mpdid if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ or (msg['jobalias'] and sjobid[2] == msg['jobalias']): for manPid in self.activeJobs[jobid].keys(): if self.activeJobs[jobid][manPid]['username'] == msg['username'] \ or msg['username'] == 'root': try: pgrp = manPid * (-1) # neg manPid -> group os.kill(pgrp,signal.SIGKILL) cliPid = self.activeJobs[jobid][manPid]['clipid'] pgrp = cliPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # neg Pid -> group handledHere = 1 except: pass # del self.activeJobs[jobid] ## handled when child goes away if handledHere: msg['handled'] = 1 if not forwarded and msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) if msg['src'] == self.myId: if self.conSock: self.conSock.send_dict_msg( {'cmd' : 'mpdkilljob_ack', 'handled' : msg['handled'] } ) elif msg['cmd'] == 'abortjob': if msg['src'] != self.myId: self.ring.rhsSock.send_dict_msg(msg) for jobid in self.activeJobs.keys(): if jobid == msg['jobid']: for manPid in self.activeJobs[jobid].keys(): manSocket = self.activeJobs[jobid][manPid]['socktoman'] if manSocket: manSocket.send_dict_msg(msg) sleep(0.5) # give man a brief chance to deal with this try: pgrp = manPid * (-1) # neg manPid -> group os.kill(pgrp,signal.SIGKILL) cliPid = self.activeJobs[jobid][manPid]['clipid'] pgrp = cliPid * (-1) # neg Pid -> group os.kill(pgrp,signal.SIGKILL) # neg Pid -> group except: pass # del self.activeJobs[jobid] ## handled when child goes away elif msg['cmd'] == 'pulse': self.ring.lhsSock.send_dict_msg({'cmd':'pulse_ack'}) elif msg['cmd'] == 'verify_hosts_in_ring': while self.myIfhn in msg['host_list'] or self.myHost in msg['host_list']: if self.myIfhn in msg['host_list']: msg['host_list'].remove(self.myIfhn) elif self.myHost in msg['host_list']: msg['host_list'].remove(self.myHost) if msg['dest'] == self.myId: msgToSend = { 'cmd' : 'verify_hosts_in_ring_response', 'host_list' : msg['host_list'] } self.conSock.send_dict_msg(msgToSend) else: self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'pmi_lookup_name': if msg['src'] == self.myId: if msg.has_key('port') and msg['port'] != 0: msgToSend = msg msgToSend['cmd'] = 'lookup_result' msgToSend['info'] = 'ok' else: msgToSend = { 'cmd' : 'lookup_result', 'info' : 'unknown_service', 'port' : 0} jobid = msg['jobid'] manPid = msg['manpid'] manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg(msgToSend) else: if self.pmi_published_names.has_key(msg['service']): msg['port'] = self.pmi_published_names[msg['service']] self.ring.rhsSock.send_dict_msg(msg) elif msg['cmd'] == 'pmi_unpublish_name': if msg['src'] == self.myId: if msg.has_key('done'): msgToSend = msg msgToSend['cmd'] = 'unpublish_result' msgToSend['info'] = 'ok' else: msgToSend = { 'cmd' : 'unpublish_result', 'info' : 'unknown_service' } jobid = msg['jobid'] manPid = msg['manpid'] manSock = self.activeJobs[jobid][manPid]['socktoman'] manSock.send_dict_msg(msgToSend) else: if self.pmi_published_names.has_key(msg['service']): del self.pmi_published_names[msg['service']] msg['done'] = 1 self.ring.rhsSock.send_dict_msg(msg) else: mpd_print(1, 'unrecognized cmd from lhs: %s' % (msg) ) def handle_rhs_input(self,sock): if self.allExiting: return msg = sock.recv_dict_msg() if not msg: # lost rhs; re-knit the ring if sock == self.ring.rhsSock: needToReenter = 1 else: needToReenter = 0 if sock == self.ring.rhsSock and self.ring.lhsSock: self.streamHandler.del_handler(self.ring.lhsSock) self.ring.lhsSock.close() self.ring.lhsSock = 0 if sock == self.ring.rhsSock and self.ring.rhsSock: self.streamHandler.del_handler(self.ring.rhsSock) self.ring.rhsSock.close() self.ring.rhsSock = 0 if needToReenter: mpd_print(1,'lost rhs; re-entering ring') rc = self.ring.reenter_ring(lhsHandler=self.handle_lhs_input, rhsHandler=self.handle_rhs_input, ntries=16) if rc == 0: mpd_print(1,'back in ring') else: mpd_print(1,'failed to reenter ring') sys.exit(-1) return if msg['cmd'] == 'pulse_ack': self.pulse_ctr = 0 elif msg['cmd'] == 'mpdexiting': # for mpdexit if self.ring.rhsSock: self.streamHandler.del_handler(self.ring.rhsSock) self.ring.rhsSock.close() self.ring.rhsSock = 0 # connect to new rhs self.ring.rhsIfhn = msg['rhsifhn'] self.ring.rhsPort = int(msg['rhsport']) mpd_print(0000,"TRYING TO CONN TO %s %s" % (self.ring.rhsIfhn,self.ring.rhsPort)) if self.ring.rhsIfhn == self.myIfhn and self.ring.rhsPort == self.parmdb['MPD_LISTEN_PORT']: rv = self.ring.connect_rhs(rhsHost=self.ring.rhsIfhn, rhsPort=self.ring.rhsPort, rhsHandler=self.handle_rhs_input, numTries=3) if rv[0] <= 0: # connect did not succeed; may try again mpd_print(1,"rhs connect failed") sys.exit(-1) return self.ring.rhsSock = MPDSock(name='rhs') self.ring.rhsSock.connect((self.ring.rhsIfhn,self.ring.rhsPort)) if not self.ring.rhsSock: mpd_print(1,'handle_rhs_input failed to obtain rhs socket') return msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'host' : self.myHost, 'ifhn' : self.myIfhn, 'port' : self.parmdb['MPD_LISTEN_PORT'] } self.ring.rhsSock.send_dict_msg(msgToSend) msg = self.ring.rhsSock.recv_dict_msg() if (not msg) or \ (not msg.has_key('cmd')) or \ (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')): mpd_print(1, 'failed to recv challenge from rhs; msg=:%s:' % (msg) ) response = md5new(''.join([self.parmdb['MPD_SECRETWORD'], msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'host' : self.myHost, 'ifhn' : self.myIfhn, 'port' : self.parmdb['MPD_LISTEN_PORT'] } self.ring.rhsSock.send_dict_msg(msgToSend) msg = self.ring.rhsSock.recv_dict_msg() if (not msg) or \ (not msg.has_key('cmd')) or \ (msg['cmd'] != 'OK_to_enter_as_lhs'): mpd_print(1, 'NOT OK to enter ring; msg=:%s:' % (msg) ) mpd_print(0000,"GOT CONN TO %s %s" % (self.ring.rhsIfhn,self.ring.rhsPort)) else: mpd_print(1, 'unexpected from rhs; msg=:%s:' % (msg) ) return def do_mpdrun(self,msg): if self.parmdb['MPD_LOGFILE_TRUNC_SZ'] >= 0: try: logSize = os.stat(self.logFilename)[stat.ST_SIZE] if logSize > self.parmdb['MPD_LOGFILE_TRUNC_SZ']: self.logFile.truncate(self.parmdb['MPD_LOGFILE_TRUNC_SZ']) except: pass if msg.has_key('jobid'): jobid = msg['jobid'] else: jobid = str(self.nextJobInt) + ' ' + self.myId + ' ' + msg['jobalias'] self.nextJobInt += 1 msg['jobid'] = jobid if msg['nstarted'] >= msg['nprocs']: self.ring.rhsSock.send_dict_msg(msg) # forward it on around return hosts = msg['hosts'] if self.myIfhn in hosts.values(): hostsKeys = hosts.keys() hostsKeys.sort() for ranks in hostsKeys: if hosts[ranks] == self.myIfhn: (lorank,hirank) = ranks for rank in range(lorank,hirank+1): self.run_one_cli(rank,msg) msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 del msg['hosts'][ranks] elif '_any_from_pool_' in hosts.values(): hostsKeys = hosts.keys() hostsKeys.sort() for ranks in hostsKeys: if hosts[ranks] == '_any_from_pool_': (lorank,hirank) = ranks hostSpecPool = msg['host_spec_pool'] if self.myIfhn in hostSpecPool or self.myHost in hostSpecPool: self.run_one_cli(lorank,msg) msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 del msg['hosts'][ranks]
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -