📄 mpdrun.py
字号:
#!/usr/bin/env pythonfrom sys import argv, exit, stdout, stderrfrom os import environ, fork, execvpe, getuid, getpid, path, getcwd, \ close, wait, waitpid, kill, _exit, \ WIFSIGNALED, WEXITSTATUSfrom socket import socket, fromfd, AF_UNIX, SOCK_STREAM, gethostnamefrom select import selectfrom signal import signal, alarm, SIG_DFL, SIGINT, SIGTSTP, SIGCONT, SIGALRMfrom exceptions import Exceptionfrom xml.dom.minidom import parseStringfrom mpdlib import mpd_set_my_id, mpd_send_one_msg, mpd_recv_one_msg, \ mpd_get_inet_listen_socket, mpd_get_my_username, \ mpd_raise, mpdError, mpd_versionclass mpdrunInterrupted(Exception): def __init__(self,args=None): self.args = argsglobal nprocs, pgm, pgmArgs, mship, rship, argsFilename, try0Locally, lineLabelsglobal manSocket, timeout, sigExitDueToTimeoutdef mpdrun(): global nprocs, pgm, pgmArgs, mship, rship, argsFilename, try0Locally, lineLabels, jobalias global manSocket, timeout, sigExitDueToTimeout mpd_set_my_id('mpdrun_' + `getpid()`) pgm = '' mship = '' rship = '' nprocs = 0 jobalias = '' argsFilename = '' try0Locally = 1 lineLabels = 0 process_cmdline_args() sigExitDueToTimeout = 1 (listenSocket,listenPort) = mpd_get_inet_listen_socket('',0) cwd = path.abspath(getcwd()) username = mpd_get_my_username() if environ.has_key('MPDRUN_TIMEOUT'): timeout = int(environ['MPDRUN_TIMEOUT']) elif environ.has_key('MPIEXEC_TIMEOUT'): timeout = int(environ['MPIEXEC_TIMEOUT']) else: timeout = 0 if environ.has_key('UNIX_SOCKET'): conFD = int(environ['UNIX_SOCKET']) conSocket = fromfd(conFD,AF_UNIX,SOCK_STREAM) close(conFD) else: consoleName = '/tmp/mpd2.console_' + username conSocket = socket(AF_UNIX,SOCK_STREAM) # note: UNIX socket try: conSocket.connect(consoleName) except Exception, errmsg: mpd_raise('cannot connect to local mpd') # mpd_raise('cannot connect to local mpd; errmsg: %s' % (str(errmsg)) ) msgToSend = { 'cmd' : 'get_mpd_version' } mpd_send_one_msg(conSocket,msgToSend) msg = mpd_recv_one_msg(conSocket) if not msg: mpd_raise('mpd unexpectedly closed connection') elif msg['cmd'] != 'mpd_version_response': mpd_raise('unexpected msg from mpd :%s:' % (msg) ) if msg['mpd_version'] != mpd_version: mpd_raise('mpd version %s does not match mine %s' % (msg['mpd_version'],mpd_version) ) if argsFilename: argsFile = open(argsFilename,'r') args = argsFile.read() parsedArgs = parseString(args) if parsedArgs.doctype.name != 'PMRequests': print 'expecting PMRequests; got unrecognized doctype %s' exit(-1) createReq = parsedArgs.getElementsByTagName('create-process-group')[0] if createReq.hasAttribute('totalprocs'): nprocs = int(createReq.getAttribute('totalprocs')) else: print '** totalprocs not specified in %s' % argsFilename exit(-1) if createReq.hasAttribute('line_labels'): lineLabels = 1 if createReq.hasAttribute('jobalias'): jobalias = createReq.getAttribute('jobalias') hosts = extract_from_xml(createReq,'host','name','_any_') execs = extract_from_xml(createReq,'exec','name','') paths = extract_from_xml(createReq,'path','name','') users = extract_from_xml(createReq,'user','name',username) cwds = extract_from_xml(createReq,'cwd','name',cwd) # handle cmd-line args covered = [0] * nprocs args = {} if createReq.hasAttribute('args'): defaultArgs = createReq.getAttribute('args') else: defaultArgs = '' argsElements = createReq.getElementsByTagName('args') for elem in argsElements: ranks = elem.getAttribute('range').split('-') if len(ranks) == 1: ranks = (ranks[0],ranks[0]) ranks = tuple(map(int,ranks)) for i in range(ranks[0],ranks[1]+1): if i >= nprocs: print '*** exiting; rank %d is greater than nprocs for args' % i exit(-1) if covered[i]: print '*** exiting; rank %d is multiply covered for args' % (i) exit(-1) covered[i] = 1 argStr = '' argList = elem.getElementsByTagName('arg') for argElem in argList: arg = argElem.getAttribute('value') argStr = argStr + ' ' + arg args[ranks] = argStr i = 0 while i < len(covered): if not covered[i]: s = i while i < len(covered) and not covered[i]: i += 1 args[(s,i-1)] = defaultArgs else: i += 1 # handle env vars covered = [0] * nprocs envvars = {} if createReq.hasAttribute('envvars'): defaultEnvVars = createReq.getAttribute('envvars') else: defaultEnvVars = '' envVarsElements = createReq.getElementsByTagName('envvars') for elem in envVarsElements: ranks = elem.getAttribute('range').split('-') if len(ranks) == 1: ranks = (ranks[0],ranks[0]) ranks = tuple(map(int,ranks)) for i in range(ranks[0],ranks[1]+1): if i >= nprocs: print '*** exiting; rank %d is greater than nprocs for envvars' % i exit(-1) if covered[i]: print '*** exiting; rank %d is multiply covered for envvars' % (i) exit(-1) covered[i] = 1 evnVarStr = '' evnVarList = elem.getElementsByTagName('envvar') for evnVarElem in evnVarList: envkey = evnVarElem.getAttribute('key') envval = evnVarElem.getAttribute('value') evnVarStr = evnVarStr + ' ' + envkey + '=' + envval envvars[ranks] = evnVarStr i = 0 while i < len(covered): if not covered[i]: s = i while i < len(covered) and not covered[i]: i += 1 envvars[(s,i-1)] = defaultEnvVars else: i += 1 if createReq.getElementsByTagName('dont_try_0_locally'): try0Locally = 0 else: if not nprocs: print 'you have to indicate how many processes to start' usage() hosts = { (0,nprocs-1) : '_any_' } execs = { (0,nprocs-1) : pgm } users = { (0,nprocs-1) : username } cwds = { (0,nprocs-1) : cwd } paths = { (0,nprocs-1) : environ['PATH'] } args = { (0,nprocs-1) : pgmArgs } envvars = { (0,nprocs-1) : '' } if mship: (mshipSocket,mshipPort) = mpd_get_inet_listen_socket('',0) mshipPid = fork() if mshipPid == 0: conSocket.close() environ['MPDCP_AM_MSHIP'] = '1' environ['MPDCP_MSHIP_PORT'] = str(mshipPort) environ['MPDCP_MSHIP_FD'] = str(mshipSocket.fileno()) environ['MPDCP_MSHIP_NPROCS'] = str(nprocs) try: execvpe(mship,[mship],environ) except Exception, errmsg: mpd_raise('execvpe failed for copgm %s; errmsg=:%s:' % (mship,errmsg) ) _exit(0); # do NOT do cleanup mshipSocket.close() else: mshipPid = 0 msgToSend = { 'cmd' : 'mpdrun', 'conhost' : gethostname(), 'conport' : listenPort, 'spawned' : 0, 'nstarted' : 0, 'nprocs' : nprocs, 'hosts' : hosts, 'execs' : execs, 'jobalias' : jobalias, 'users' : users, 'cwds' : cwds, 'paths' : paths, 'args' : args, 'envvars' : envvars } if try0Locally: msgToSend['try_0_locally'] = 1 if lineLabels: msgToSend['line_labels'] = 1 if rship: msgToSend['rship'] = rship msgToSend['mship_host'] = gethostname() msgToSend['mship_port'] = mshipPort mpd_send_one_msg(conSocket,msgToSend) msg = mpd_recv_one_msg(conSocket) if not msg: mpd_raise('mpd unexpectedly closed connection') elif msg['cmd'] != 'mpdrun_ack': if msg['cmd'] == 'already_have_a_console': mpd_raise('mpd already has a console (e.g. for long ringtest); try later') elif msg['cmd'] == 'job_failed' and msg['reason'] == 'some_procs_not_started': mpd_raise('unable to start all procs; may have invalid machine names') else: mpd_raise('unexpected message from mpd: %s' % (msg) )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -