📄 mpdrun.py
字号:
conSocket.close() if timeout: signal(SIGALRM,sig_handler) alarm(timeout) (manSocket,addr) = listenSocket.accept() msg = mpd_recv_one_msg(manSocket) if (not msg or not msg.has_key('cmd')): mpd_raise('mpdrun: from man, invalid msg=:%s:' % (msg) ) if (msg['cmd'] == 'job_started'): # print 'mpdrun: job %s started' % (msg['jobid']) pass else: mpd_raise('mpdrun: from man, unknown msg=:%s:' % (msg) ) (manCliStdoutSocket,addr) = listenSocket.accept() (manCliStderrSocket,addr) = listenSocket.accept() socketsToSelect = { manSocket : 1, manCliStdoutSocket : 1, manCliStderrSocket : 1 } done = 0 while done < 3: # man, client stdout, and client stderr try: (readySockets,None,None) = select(socketsToSelect.keys(),[],[],30) for readySocket in readySockets: if readySocket == manSocket: msg = mpd_recv_one_msg(manSocket) if not msg: # mpd_raise('mpdrun: empty msg from man; it must have terminated early') print 'mpdrun: empty msg from man; it must have terminated early' del socketsToSelect[readySocket] readySocket.close() done += 1 elif not msg.has_key('cmd'): mpd_raise('mpdrun: from man, invalid msg=:%s:' % (msg) ) elif msg['cmd'] == 'job_terminated_early': print 'rank %d in job %s terminated without calling MPI_Finalize' % ( msg['rank'], msg['jobid'] ) # print 'mpdrun: job %s terminated early at rank %d' % (msg['jobid'], msg['rank']) # del socketsToSelect[readySocket] # readySocket.close() # done += 1 elif (msg['cmd'] == 'job_terminated'): del socketsToSelect[readySocket] readySocket.close() done += 1 elif (msg['cmd'] == 'client_exit_status'): status = msg['status'] if WIFSIGNALED(status): killed_status = status & 0x007f # AND off core flag # print 'exit status of rank %d: killed by signal %d ' % (msg['rank'],killed_status) else: exit_status = WEXITSTATUS(status) # print 'exit status of rank %d: return code %d ' % (msg['rank'],exit_status) else: print 'unrecognized msg from manager :%s:' % msg elif readySocket == manCliStdoutSocket: msg = manCliStdoutSocket.recv(1024) if not msg: del socketsToSelect[readySocket] readySocket.close() done += 1 else: print msg, # print 'MS: %s' % (msg.strip()) stdout.flush() elif readySocket == manCliStderrSocket: msg = manCliStderrSocket.recv(1024) if not msg: del socketsToSelect[readySocket] readySocket.close() done += 1 else: print >>stderr, msg, # print >>stderr, 'MS: %s' % (msg.strip()) stderr.flush() else: mpd_raise('unrecognized ready socket :%s:' % (readySocket) ) except mpdError, errmsg: print 'mpdrun failed: %s' % (errmsg) exit(-1) except mpdrunInterrupted, errmsg: if errmsg.args == 'SIGINT': if manSocket: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' } mpd_send_one_msg(manSocket,msgToSend) manSocket.close() exit(-1) elif errmsg.args == 'SIGTSTP': if manSocket: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGTSTP' } mpd_send_one_msg(manSocket,msgToSend) signal(SIGTSTP,SIG_DFL) # stop myself kill(getpid(),SIGTSTP) signal(SIGTSTP,sig_handler) # restore this handler except Exception, errmsg: if isinstance(errmsg,Exception) and errmsg[0] == 4: # interrupted system call continue elif sigExitDueToTimeout: exit(-1) else: mpd_raise('mpdrun: select failed: errmsg=:%s:' % (errmsg) ) if mshipPid: (donePid,status) = wait() # waitpid(mshipPid,0)def sig_handler(signum,frame): # for some reason, I (rmb) was unable to handle TSTP and CONT in the same way global manSocket, timeout, sigExitDueToTimeout if signum == SIGINT: raise mpdrunInterrupted, 'SIGINT' elif signum == SIGTSTP: raise mpdrunInterrupted, 'SIGTSTP' elif signum == SIGCONT: if manSocket: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGCONT' } mpd_send_one_msg(manSocket,msgToSend) elif signum == SIGALRM: if manSocket: msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' } mpd_send_one_msg(manSocket,msgToSend) manSocket.close() print 'mpdrun terminating due to timeout %d seconds' % timeout sigExitDueToTimeout = 1 exit(-1)def process_cmdline_args(): global nprocs, pgm, pgmArgs, mship, rship, argsFilename, try0Locally, lineLabels, jobalias if len(argv) < 3: usage() if argv[1] == '-f': argsFilename = argv[2] # initialized to '' in main argidx = 3 else: argidx = 1 if not argsFilename: while pgm == '': if argidx >= len(argv): usage() if argv[argidx][0] == '-': if argsFilename: print 'Cannot use other args with -f' usage() if argv[argidx] == '-np' or argv[argidx] == '-n': nprocs = int(argv[argidx+1]) argidx += 2 elif argv[argidx] == '-f': argsFilename = argv[argidx+1] argidx += 2 np_or_filename += 1 elif argv[argidx] == '-a': jobalias = argv[argidx+1] argidx += 2 elif argv[argidx] == '-cpm': mship = argv[argidx+1] argidx += 2 elif argv[argidx] == '-cpr': rship = argv[argidx+1] argidx += 2 elif argv[argidx] == '-l': lineLabels = 1 argidx += 1 elif argv[argidx] == '-1': try0Locally = 0 argidx += 1 else: usage() else: pgm = argv[argidx] argidx += 1 pgmArgs = '' while argidx < len(argv): pgmArgs = pgmArgs + argv[argidx] + ' ' argidx += 1 pgmArgs = pgmArgs.strip()def extract_from_xml(createReq,attr,name,defaultVal): global nprocs if createReq.hasAttribute(attr): defaultVal = createReq.getAttribute(attr) attrList = createReq.getElementsByTagName(attr) covered = [0] * nprocs attrs = {} for a in attrList: ranks = a.getAttribute('range').split('-') if len(ranks) == 1: ranks = (ranks[0],ranks[0]) ranks = tuple(map(int,ranks)) for i in range(ranks[0],ranks[1]+1): if i >= nprocs: print '*** exiting; rank %d is greater than nprocs' % i exit(-1) if covered[i]: print '*** exiting; rank %d is multiply covered for %s' % (i,attr) exit(-1) covered[i] = 1 attrs[ranks] = a.getAttribute(name) i = 0 while i < len(covered): if not covered[i]: s = i while i < len(covered) and not covered[i]: i += 1 attrs[(s,i-1)] = defaultVal else: i += 1 return attrs def usage(): print 'mpdrun for mpd version: %s' % str(mpd_version) print 'usage: mpdrun [args] pgm_to_execute [pgm_args]' print ' where args may be: -a alias -np nprocs -cpm master_copgm -cpr remote_copgm -l -1' print ' (-l means attach line labels identifying which client prints each line)' print ' (-1 means do NOT start the first process locally)' print ' (-a means assign this alias to the job)' print 'or: mpdrun -f filename' print ' where filename contains all the arguments in xml format' exit(-1)if __name__ == '__main__': global manSocket manSocket = 0 # set when we get conn'd to a manager signal(SIGINT,sig_handler) signal(SIGTSTP,sig_handler) signal(SIGCONT,sig_handler) try: mpdrun() except mpdError, errmsg: print 'mpdrun failed: %s' % (errmsg) except SystemExit, errmsg: pass
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -