📄 mpdman.py
字号:
## NOTE: if you spawn a non-MPI job, it may not send this msg ## in which case the pgm will hang; the reason for this is that ## mpich2 does an Accept after the PMI_Spawn_multiple and a non-mpi ## pgm will never do the expected Connect. self.stdoutToConSock = MPDSock(name='stdout_to_console') self.stdoutToConSock.connect((self.conIfhn,self.conPort)) if self.spawned: msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : self.myRank } self.stdoutToConSock.send_dict_msg(msgToSend) self.stderrToConSock = MPDSock(name='stderr_to_console') self.stderrToConSock.connect((self.conIfhn,self.conPort)) if self.spawned: msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : self.myRank } self.stderrToConSock.send_dict_msg(msgToSend) else: self.conSock = 0 if self.myRank == 0: self.parentStdoutSock = self.stdoutToConSock self.parentStderrSock = self.stderrToConSock else: self.parentStdoutSock = 0 self.parentStderrSock = 0 msg = self.ring.lhsSock.recv_dict_msg() # recv msg containing ringsize if not msg or not msg.has_key('cmd') or msg['cmd'] != 'ringsize': mpd_print(1,'invalid msg from lhs; expecting ringsize got: %s' % (msg) ) sys.exit(-1) if self.myRank != 0: self.ring.rhsSock.send_dict_msg(msg) if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'): self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE']) else: self.universeSize = msg['ring_ncpus'] if self.doingBNR: (self.pmiSock,self.cliBNRSock) = mpd_sockpair() self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) cli_env['MAN_MSGS_FD'] = str(self.cliBNRSock.fileno()) ## BNR self.numDone = 0 self.numWithIO = 2 # stdout and stderr so far self.numConndWithIO = 2 # FIXME: This is the old singleton approach, which didn't allow # for more than one process to be a singleton if self.singinitPORT: self.pmiListenSock = 0 self.pmiSock = MPDSock(name='pmi') self.pmiSock.connect((self.myIfhn,self.singinitPORT)) self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) self.pmiSock.send_char_msg('cmd=singinit authtype=none\n') line = self.pmiSock.recv_char_msg() charMsg = 'cmd=singinit_info rc=0 versionok=yes stdio=yes kvsname=%s\n' % (self.default_kvsname) self.pmiSock.send_char_msg(charMsg) sock_write_cli_stdin = MPDSock(name='write_cli_stdin') sock_write_cli_stdin.connect((self.myIfhn,self.singinitPORT)) self.fd_write_cli_stdin = sock_write_cli_stdin.fileno() sock_read_cli_stdout = MPDSock(name='read_cli_stdout') sock_read_cli_stdout.connect((self.myIfhn,self.singinitPORT)) self.fd_read_cli_stdout = sock_read_cli_stdout.fileno() sock_read_cli_stderr = MPDSock(name='read_cli_stderr') sock_read_cli_stderr.connect((self.myIfhn,self.singinitPORT)) self.fd_read_cli_stderr = sock_read_cli_stderr.fileno() else: self.cliListenSock = MPDListenSock('',0,name='cli_listen_sock') ## BNR self.cliListenPort = self.cliListenSock.getsockname()[1] ## BNR self.pmiListenSock = MPDListenSock('',0,name='pmi_listen_sock') self.pmiListenPort = self.pmiListenSock.getsockname()[1] self.subproc = 0 # default; use fork instead of subprocess if self.singinitPID: clientPid = self.singinitPID else: cli_env['PATH'] = os.environ['MPDMAN_CLI_PATH'] cli_env['PMI_PORT'] = '%s:%s' % (self.myIfhn,self.pmiListenPort) cli_env['PMI_SIZE'] = str(self.nprocs) cli_env['PMI_RANK'] = str(self.myRank) cli_env['PMI_DEBUG'] = str(0) cli_env['PMI_TOTALVIEW'] = str(self.totalview) if self.spawned: cli_env['PMI_SPAWNED'] = '1' else: cli_env['PMI_SPAWNED'] = '0' if self.doingBNR: cli_env['MPD_TVDEBUG'] = str(0) ## BNR cli_env['MPD_JID'] = os.environ['MPDMAN_JOBID'] ## BNR cli_env['MPD_JSIZE'] = str(self.nprocs) ## BNR cli_env['MPD_JRANK'] = str(self.myRank) ## BNR cli_env['CLIENT_LISTENER_FD'] = str(self.cliListenSock.fileno()) ## BNR if hasattr(os,'fork'): (self.fd_read_cli_stdin, self.fd_write_cli_stdin ) = os.pipe() (self.fd_read_cli_stdout,self.fd_write_cli_stdout) = os.pipe() (self.fd_read_cli_stderr,self.fd_write_cli_stderr) = os.pipe() (self.handshake_sock_man_end,self.handshake_sock_cli_end) = mpd_sockpair() clientPid = self.launch_client_via_fork_exec(cli_env) if clientPid < 0: print '**** mpdman: launch_client_via_fork_exec failed; exiting' sys.exit(-1) elif clientPid > 0: self.handshake_sock_cli_end.close() else: # 0 self.handshake_sock_man_end.close() elif subprocess_module_available: clientPid = self.launch_client_via_subprocess(cli_env) # may chg self.subproc else: mpd_print(1,'neither fork nor subprocess is available') sys.exit(-1) # if not initially a recvr of stdin (e.g. gdb) then give immediate eof to client if not in_stdinRcvrs(self.myRank,self.stdinDest): if self.subproc: # must close subproc's file (not just the fd) self.subproc.stdin.close() else: os.close(self.fd_write_cli_stdin) if self.doingBNR: self.cliBNRSock.close() msgToSend = { 'cmd' : 'client_info', 'jobid' : self.jobid, 'clipid' : clientPid, 'manpid' : os.getpid(), 'rank' : self.myRank, 'spawner_manpid' : int(os.environ['MPDMAN_SPAWNER_MANPID']), 'spawner_mpd' : os.environ['MPDMAN_SPAWNER_MPD'] } self.mpdSock.send_dict_msg(msgToSend) if not self.subproc: self.streamHandler.set_handler(self.fd_read_cli_stdout, self.handle_cli_stdout_input) self.streamHandler.set_handler(self.fd_read_cli_stderr, self.handle_cli_stderr_input) self.waitPids = [clientPid] if self.pmiListenSock: self.streamHandler.set_handler(self.pmiListenSock,self.handle_pmi_connection) # begin setup of stdio tree (parent,lchild,rchild) = mpd_get_ranks_in_binary_tree(self.myRank,self.nprocs) self.spawnedChildSocks = [] self.childrenStdoutTreeSocks = [] self.childrenStderrTreeSocks = [] if lchild >= 0: self.numWithIO += 2 # stdout and stderr from child msgToSend = { 'cmd' : 'info_from_parent_in_tree', 'to_rank' : str(lchild), 'parent_ifhn' : self.myIfhn, 'parent_port' : self.listenNonRingPort } self.ring.rhsSock.send_dict_msg(msgToSend) if rchild >= 0: self.numWithIO += 2 # stdout and stderr from child msgToSend = { 'cmd' : 'info_from_parent_in_tree', 'to_rank' : str(rchild), 'parent_ifhn' : self.myIfhn, 'parent_port' : self.listenNonRingPort } self.ring.rhsSock.send_dict_msg(msgToSend) if os.environ.has_key('MPDMAN_RSHIP'): rship = os.environ['MPDMAN_RSHIP'] # (rshipSock,rshipPort) = mpd_get_inet_listen_sock('',0) rshipPid = os.fork() if rshipPid == 0: os.environ['MPDCP_MSHIP_HOST'] = os.environ['MPDMAN_MSHIP_HOST'] os.environ['MPDCP_MSHIP_PORT'] = os.environ['MPDMAN_MSHIP_PORT'] os.environ['MPDCP_MSHIP_NPROCS'] = str(self.nprocs) os.environ['MPDCP_CLI_PID'] = str(clientPid) try: os.execvpe(rship,[rship],os.environ) except Exception, errmsg: # make sure my error msgs get to console os.dup2(self.parentStdoutSock.fileno(),1) # closes fd 1 (stdout) if open os.dup2(self.parentStderrSock.fileno(),2) # closes fd 2 (stderr) if open mpd_print(1,'execvpe failed for copgm %s; errmsg=:%s:' % (rship,errmsg) ) sys.exit(-1) sys.exit(0) # rshipSock.close() self.waitPids.append(rshipPid) self.tvReady = 0 self.pmiBarrierInRecvd = 0 self.holdingPMIBarrierLoop1 = 0 if self.myRank == 0: self.holdingEndBarrierLoop1 = 1 self.holdingJobgoLoop1 = { 'cmd' : 'jobgo_loop_1', 'procinfo' : [] } else: self.holdingEndBarrierLoop1 = 0 self.holdingJobgoLoop1 = 0 self.jobStarted = 0 self.endBarrierDone = 0 # Main Loop while not self.endBarrierDone: if self.numDone >= self.numWithIO and (self.singinitPID or self.subproc): clientExited = 1 clientExitStatus = 0 if self.holdingJobgoLoop1 and self.numConndWithIO >= self.numWithIO: msgToSend = self.holdingJobgoLoop1 self.ring.rhsSock.send_dict_msg(msgToSend) self.holdingJobgoLoop1 = 0 rv = self.streamHandler.handle_active_streams(timeout=5.0) if rv[0] < 0: if type(rv[1]) == ClassType and rv[1] == KeyboardInterrupt: # ^C sys.exit(-1) if clientExited: if self.jobStarted and not clientExitStatusSent: msgToSend = { 'cmd' : 'client_exit_status', 'man_id' : self.myId, 'cli_status' : clientExitStatus, 'cli_host' : self.myHost, 'cli_ifhn' : self.myIfhn, 'cli_pid' : clientPid, 'cli_rank' : self.myRank } if self.myRank == 0: if self.conSock: try: self.conSock.send_dict_msg(msgToSend) except: pass else: if self.ring.rhsSock: self.ring.rhsSock.send_dict_msg(msgToSend) clientExitStatusSent = 1 if self.holdingEndBarrierLoop1 and self.numDone >= self.numWithIO: self.holdingEndBarrierLoop1 = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'} self.ring.rhsSock.send_dict_msg(msgToSend) mpd_print(0000, "out of loop") # may want to wait for waitPids here def handle_nonring_connection(self,sock): (tempSock,tempConnAddr) = self.listenNonRingSock.accept() msg = tempSock.recv_dict_msg() if msg and msg.has_key('cmd'): if msg['cmd'] == 'child_in_stdout_tree': self.streamHandler.set_handler(tempSock,self.handle_child_stdout_tree_input) self.childrenStdoutTreeSocks.append(tempSock) self.numConndWithIO += 1 elif msg['cmd'] == 'child_in_stderr_tree': self.streamHandler.set_handler(tempSock,self.handle_child_stderr_tree_input) self.childrenStderrTreeSocks.append(tempSock) self.numConndWithIO += 1 elif msg['cmd'] == 'spawned_man0_is_up': self.streamHandler.set_handler(tempSock,self.handle_spawned_child_input) self.spawnedChildSocks.append(tempSock) tempID = msg['spawned_id'] spawnedKVSname = 'mpdman_kvs_for_spawned_' + tempID msgToSend = { 'cmd' : 'preput_info_for_child', 'kvs' : self.KVSs[spawnedKVSname] } tempSock.send_dict_msg(msgToSend) msgToSend = { 'cmd' : 'ringsize', 'ring_ncpus' : self.universeSize } tempSock.send_dict_msg(msgToSend) else: mpd_print(1, 'unknown msg recvd on listenNonRingSock :%s:' % (msg) ) def handle_lhs_input(self,sock): msg = self.ring.lhsSock.recv_dict_msg() if not msg: mpd_print(0000, 'lhs died' ) self.streamHandler.del_handler(self.ring.lhsSock) self.ring.lhsSock.close() elif msg['cmd'] == 'jobgo_loop_1': if self.myRank == 0: if self.totalview: msg['procinfo'].insert(0,(socket.gethostname(),self.clientPgm,clientPid)) # let console pgm proceed msgToSend = { 'cmd' : 'job_started', 'jobid' : self.jobid, 'procinfo' : msg['procinfo'] } self.conSock.send_dict_msg(msgToSend,errprint=0) msgToSend = { 'cmd' : 'jobgo_loop_2' } self.ring.rhsSock.send_dict_msg(msgToSend) else: if self.totalview:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -