📄 mpd.py
字号:
mpd_print(1, 'unrecognized cmd from lhs: %s' % (msg) ) def handle_rhs_input(self,sock): if self.allExiting: return msg = sock.recv_dict_msg() if not msg: # lost rhs; re-knit the ring if sock == self.ring.rhsSock: needToReenter = 1 else: needToReenter = 0 if sock == self.ring.rhsSock and self.ring.lhsSock: self.streamHandler.del_handler(self.ring.lhsSock) self.ring.lhsSock.close() self.ring.lhsSock = 0 if sock == self.ring.rhsSock and self.ring.rhsSock: self.streamHandler.del_handler(self.ring.rhsSock) self.ring.rhsSock.close() self.ring.rhsSock = 0 if needToReenter: mpd_print(1,'lost rhs; re-entering ring') rc = self.ring.reenter_ring(lhsHandler=self.handle_lhs_input, rhsHandler=self.handle_rhs_input, ntries=16) if rc == 0: mpd_print(1,'back in ring') else: mpd_print(1,'failed to reenter ring') sys.exit(-1) return if msg['cmd'] == 'pulse_ack': self.pulse_cntr = 0 elif msg['cmd'] == 'mpdexiting': # for mpdexit if self.ring.rhsSock: self.streamHandler.del_handler(self.ring.rhsSock) self.ring.rhsSock.close() self.ring.rhsSock = 0 # connect to new rhs self.ring.rhsIfhn = msg['rhsifhn'] self.ring.rhsPort = int(msg['rhsport']) if self.ring.rhsIfhn == self.myIfhn and self.ring.rhsPort == self.parmdb['MPD_LISTEN_PORT']: rv = self.ring.connect_rhs(rhsHost=self.ring.rhsIfhn, rhsPort=self.ring.rhsPort, rhsHandler=self.handle_rhs_input, numTries=3) if rv[0] <= 0: # connect did not succeed; may try again mpd_print(1,"rhs connect failed") sys.exit(-1) return self.ring.rhsSock = MPDSock(name='rhs') self.ring.rhsSock.connect((self.ring.rhsIfhn,self.ring.rhsPort)) self.pulse_cntr = 0 if not self.ring.rhsSock: mpd_print(1,'handle_rhs_input failed to obtain rhs socket') return msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'host' : self.myHost, 'ifhn' : self.myIfhn, 'port' : self.parmdb['MPD_LISTEN_PORT'] } self.ring.rhsSock.send_dict_msg(msgToSend) msg = self.ring.rhsSock.recv_dict_msg() if (not msg) or \ (not msg.has_key('cmd')) or \ (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')): mpd_print(1, 'failed to recv challenge from rhs; msg=:%s:' % (msg) ) response = md5new(''.join([self.parmdb['MPD_SECRETWORD'], msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'host' : self.myHost, 'ifhn' : self.myIfhn, 'port' : self.parmdb['MPD_LISTEN_PORT'] } self.ring.rhsSock.send_dict_msg(msgToSend) msg = self.ring.rhsSock.recv_dict_msg() if (not msg) or \ (not msg.has_key('cmd')) or \ (msg['cmd'] != 'OK_to_enter_as_lhs'): mpd_print(1, 'NOT OK to enter ring; msg=:%s:' % (msg) ) self.streamHandler.set_handler(self.ring.rhsSock,self.handle_rhs_input) else: mpd_print(1, 'unexpected from rhs; msg=:%s:' % (msg) ) return def do_mpdrun(self,msg): if self.parmdb['MPD_LOGFILE_TRUNC_SZ'] >= 0: try: logSize = os.stat(self.logFilename)[stat.ST_SIZE] if logSize > self.parmdb['MPD_LOGFILE_TRUNC_SZ']: self.logFile.truncate(self.parmdb['MPD_LOGFILE_TRUNC_SZ']) except: pass if msg.has_key('jobid'): jobid = msg['jobid'] else: jobid = str(self.nextJobInt) + ' ' + self.myId + ' ' + msg['jobalias'] self.nextJobInt += 1 msg['jobid'] = jobid if msg['nstarted'] >= msg['nprocs']: self.ring.rhsSock.send_dict_msg(msg) # forward it on around return hosts = msg['hosts'] if self.myIfhn in hosts.values(): hostsKeys = hosts.keys() hostsKeys.sort() for ranks in hostsKeys: if hosts[ranks] == self.myIfhn: (lorank,hirank) = ranks for rank in range(lorank,hirank+1): self.run_one_cli(rank,msg) msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 del msg['hosts'][ranks] elif '_any_from_pool_' in hosts.values(): hostsKeys = hosts.keys() hostsKeys.sort() for ranks in hostsKeys: if hosts[ranks] == '_any_from_pool_': (lorank,hirank) = ranks hostSpecPool = msg['host_spec_pool'] if self.myIfhn in hostSpecPool or self.myHost in hostSpecPool: self.run_one_cli(lorank,msg) msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 del msg['hosts'][ranks] if lorank < hirank: msg['hosts'][(lorank+1,hirank)] = '_any_from_pool_' break elif '_any_' in hosts.values(): done = 0 while not done: hostsKeys = hosts.keys() hostsKeys.sort() for ranks in hostsKeys: if hosts[ranks] == '_any_': (lorank,hirank) = ranks self.run_one_cli(lorank,msg) msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 del msg['hosts'][ranks] if lorank < hirank: msg['hosts'][(lorank+1,hirank)] = '_any_' procsHereForJob = len(self.activeJobs[jobid].keys()) if procsHereForJob >= self.parmdb['MPD_NCPUS']: break # out of for loop # if no more to start via any or enough started here if '_any_' not in hosts.values() \ or procsHereForJob >= self.parmdb['MPD_NCPUS']: done = 1 if msg['first_loop']: msg['ringsize'] += 1 msg['ring_ncpus'] += self.parmdb['MPD_NCPUS'] self.ring.rhsSock.send_dict_msg(msg) # forward it on around def run_one_cli(self,currRank,msg): users = msg['users'] for ranks in users.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: username = users[ranks] break execs = msg['execs'] for ranks in execs.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgm = execs[ranks] break paths = msg['paths'] for ranks in paths.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pathForExec = paths[ranks] break args = msg['args'] for ranks in args.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmArgs = dumps(args[ranks]) break envvars = msg['envvars'] for ranks in envvars.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmEnvVars = dumps(envvars[ranks]) break limits = msg['limits'] for ranks in limits.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmLimits = dumps(limits[ranks]) break cwds = msg['cwds'] for ranks in cwds.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: cwd = cwds[ranks] break umasks = msg['umasks'] for ranks in umasks.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmUmask = umasks[ranks] break man_env = {} if msg['ifhns'].has_key(currRank): man_env['MPICH_INTERFACE_HOSTNAME'] = msg['ifhns'][currRank] else: man_env['MPICH_INTERFACE_HOSTNAME'] = self.myIfhn man_env.update(os.environ) # may only want to mov non-MPD_ stuff man_env['MPDMAN_MYHOST'] = self.myHost man_env['MPDMAN_MYIFHN'] = self.myIfhn man_env['MPDMAN_JOBID'] = msg['jobid'] man_env['MPDMAN_CLI_PGM'] = pgm man_env['MPDMAN_CLI_PATH'] = pathForExec man_env['MPDMAN_PGM_ARGS'] = pgmArgs man_env['MPDMAN_PGM_ENVVARS'] = pgmEnvVars man_env['MPDMAN_PGM_LIMITS'] = pgmLimits man_env['MPDMAN_CWD'] = cwd man_env['MPDMAN_UMASK'] = pgmUmask man_env['MPDMAN_SPAWNED'] = str(msg['spawned']) if msg.has_key('spawner_manpid'): man_env['MPDMAN_SPAWNER_MANPID'] = str(msg['spawner_manpid']) else: man_env['MPDMAN_SPAWNER_MANPID'] = '0' if msg.has_key('spawner_mpd'): man_env['MPDMAN_SPAWNER_MPD'] = msg['spawner_mpd'] else: man_env['MPDMAN_SPAWNER_MPD'] = '' man_env['MPDMAN_NPROCS'] = str(msg['nprocs']) man_env['MPDMAN_MPD_LISTEN_PORT'] = str(self.parmdb['MPD_LISTEN_PORT']) man_env['MPDMAN_MPD_CONF_SECRETWORD'] = self.parmdb['MPD_SECRETWORD'] man_env['MPDMAN_CONHOST'] = msg['conhost'] man_env['MPDMAN_CONIFHN'] = msg['conifhn'] man_env['MPDMAN_CONPORT'] = str(msg['conport']) man_env['MPDMAN_RANK'] = str(currRank) man_env['MPDMAN_POS_IN_RING'] = str(msg['nstarted']) man_env['MPDMAN_STDIN_DEST'] = msg['stdin_dest'] man_env['MPDMAN_TOTALVIEW'] = str(msg['totalview']) man_env['MPDMAN_GDB'] = str(msg['gdb']) man_env['MPDMAN_GDBA'] = str(msg['gdba']) # for attach to running pgm fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0]) # normalize man_env['MPDMAN_FULLPATHDIR'] = fullDirName # used to find gdbdrv man_env['MPDMAN_SINGINIT_PID'] = str(msg['singinitpid']) man_env['MPDMAN_SINGINIT_PORT'] = str(msg['singinitport']) man_env['MPDMAN_LINE_LABELS_FMT'] = msg['line_labels'] if msg.has_key('rship'): man_env['MPDMAN_RSHIP'] = msg['rship'] man_env['MPDMAN_MSHIP_HOST'] = msg['mship_host'] man_env['MPDMAN_MSHIP_PORT'] = str(msg['mship_port']) if msg.has_key('doing_bnr'): man_env['MPDMAN_DOING_BNR'] = '1' else: man_env['MPDMAN_DOING_BNR'] = '0' if msg['nstarted'] == 0: manKVSTemplate = '%s_%s_%d' % \ (self.myHost,self.parmdb['MPD_LISTEN_PORT'],self.kvs_cntr) manKVSTemplate = sub('\.','_',manKVSTemplate) # chg magpie.cs to magpie_cs manKVSTemplate = sub('\-','_',manKVSTemplate) # chg node-0 to node_0 self.kvs_cntr += 1 msg['kvs_template'] = manKVSTemplate man_env['MPDMAN_KVS_TEMPLATE'] = msg['kvs_template'] msg['username'] = username if hasattr(os,'fork'): (manPid,toManSock) = self.launch_mpdman_via_fork(msg,man_env) if not manPid: print '**** mpd: launch_client_via_fork_exec failed; exiting' elif subprocess_module_available: (manPid,toManSock) = self.launch_mpdman_via_subprocess(msg,man_env) else: mpd_print(1,'neither fork nor subpr
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -