📄 mpdman.py
字号:
#!/usr/bin/env pythonfrom os import environ, getpid, pipe, fork, fdopen, read, write, close, dup2, \ chdir, execvpe, kill, waitpid, _exitfrom sys import exitfrom socket import gethostname, fromfd, AF_INET, SOCK_STREAMfrom select import select, errorfrom re import findall, subfrom signal import signal, SIGKILL, SIGUSR1, SIGTSTP, SIGCONT, SIGCHLD, SIG_DFL, SIG_IGNfrom md5 import newfrom mpdlib import mpd_set_my_id, mpd_print, mpd_print_tb, mpd_get_ranks_in_binary_tree, \ mpd_send_one_line, mpd_recv_one_line, mpd_send_one_msg, mpd_recv_one_msg, \ mpd_get_inet_listen_socket, mpd_get_inet_socket_and_connect, \ mpd_get_my_username, mpd_raise, mpdError, mpd_versionglobal get_sigtype_from_mpddef mpdman(): global get_sigtype_from_mpd get_sigtype_from_mpd = 0 signal(SIGUSR1,sigusr1_handler) signal(SIGCHLD,SIG_DFL) # reset mpd's values myHost = environ['MPDMAN_MYHOST'] myRank = int(environ['MPDMAN_RANK']) myId = myHost + '_mpdman_' + str(myRank) spawned = int(environ['MPDMAN_SPAWNED']) if spawned: myId = myId + '_s' mpd_set_my_id(myId) try: chdir(environ['MPDMAN_CWD']) except Exception, errmsg: errmsg = '%s: invalid dir: %s' % (myId,environ['MPDMAN_CWD']) # print errmsg ## may syslog it in some cases ? clientPgm = environ['MPDMAN_CLI_PGM'] clientPgmArgs = environ['MPDMAN_PGM_ARGS'] clientPgmArgs = findall('\S+',clientPgmArgs) mpd_print(0000, "ARGS=", clientPgmArgs ) clientPgmEnv = environ['MPDMAN_PGM_ENVVARS'] clientPgmEnv = findall('\S+',clientPgmEnv) mpd_print(0000, 'entering mpdman to exec %s' % (clientPgm) ) jobid = environ['MPDMAN_JOBID'] nprocs = int(environ['MPDMAN_NPROCS']) mpdPort = int(environ['MPDMAN_MPD_LISTEN_PORT']) mpdConfPasswd = environ['MPDMAN_MPD_CONF_PASSWD'] environ['MPDMAN_MPD_CONF_PASSWD'] = '' ## do NOT pass it on to clients conHost = environ['MPDMAN_CONHOST'] conPort = int(environ['MPDMAN_CONPORT']) lhsHost = environ['MPDMAN_LHSHOST'] lhsPort = int(environ['MPDMAN_LHSPORT']) host0 = environ['MPDMAN_HOST0'] # only used by right-most man port0 = int(environ['MPDMAN_PORT0']) # only used by right-most man myPort = int(environ['MPDMAN_MY_LISTEN_PORT']) listenFD = int(environ['MPDMAN_MY_LISTEN_FD']) mpd_print(0000, "lhost=%s lport=%d h0=%s p0=%d" % (lhsHost,lhsPort,host0,port0) ) listenSocket = fromfd(listenFD,AF_INET,SOCK_STREAM) close(listenFD) socketsToSelect = { listenSocket : 1 } lineLabels = int(environ['MPDMAN_LINE_LABELS']) startStdoutLineLabel = 1 startStderrLineLabel = 1 myLineLabel = str(myRank) + ': ' # set up pmi stuff early in case I was spawned kvsname_template = 'kvs_' + host0 + '_' + str(port0) + '_' default_kvsname = kvsname_template + '0' default_kvsname = sub('\.','_',default_kvsname) # chg magpie.cs to magpie_cs exec('%s = {}' % (default_kvsname) ) kvs_next_id = 1 pmiCollectiveJob = 0 if nprocs == 1: # one-man ring lhsSocket = mpd_get_inet_socket_and_connect(host0,port0) # to myself (rhsSocket,rhsAddr) = listenSocket.accept() else: if myRank == 0: for i in range(2): # accept lhs and rhs (tempSocket,tempAddr) = listenSocket.accept() msg = mpd_recv_one_msg(tempSocket) if msg['cmd'] == 'i_am_lhs': (lhsSocket,lhsAddr) = (tempSocket,tempAddr) else: (rhsSocket,rhsAddr) = (tempSocket,tempAddr) else: lhsSocket = mpd_get_inet_socket_and_connect(lhsHost,lhsPort) mpd_send_one_msg(lhsSocket, { 'cmd' : 'i_am_rhs' } ) if myRank == (nprocs-1): # right-most man rhsSocket = mpd_get_inet_socket_and_connect(host0,port0) mpd_send_one_msg(rhsSocket, { 'cmd' : 'i_am_lhs' } ) else: (rhsSocket,rhsAddr) = listenSocket.accept() msg = mpd_recv_one_msg(rhsSocket) # drain out the i_am_... msg socketsToSelect[lhsSocket] = 1 socketsToSelect[rhsSocket] = 1 if myRank == 0: conSocket = mpd_get_inet_socket_and_connect(conHost,conPort) # for cntl msgs socketsToSelect[conSocket] = 1 if spawned: msgToSend = { 'cmd' : 'spawned_child' } mpd_send_one_msg(conSocket,msgToSend) stdoutToConSocket = mpd_get_inet_socket_and_connect(conHost,conPort) if spawned: msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : myRank } mpd_send_one_msg(stdoutToConSocket,msgToSend) stderrToConSocket = mpd_get_inet_socket_and_connect(conHost,conPort) if spawned: msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : myRank } mpd_send_one_msg(stderrToConSocket,msgToSend) else: conSocket = 0 (clientListenSocket,clientListenPort) = mpd_get_inet_listen_socket('',0) (pipe_read_cli_stdout,pipe_write_cli_stdout) = pipe() (pipe_read_cli_stderr,pipe_write_cli_stderr) = pipe() (pipe_cli_end,pipe_man_end) = pipe() clientPid = fork() if clientPid == 0: mpd_set_my_id(gethostname() + '_man_before_exec_client_' + `getpid()`) lhsSocket.close() rhsSocket.close() listenSocket.close() if conSocket: conSocket.close() # to simply print on the mpd's tty: # comment out the next lines close(pipe_read_cli_stdout) dup2(pipe_write_cli_stdout,1) # closes fd 1 (stdout) if open close(pipe_write_cli_stdout) close(pipe_read_cli_stderr) dup2(pipe_write_cli_stderr,2) # closes fd 2 (stderr) if open close(pipe_write_cli_stderr) msg = read(pipe_cli_end,2) if msg != 'go': mpd_raise('%s: invalid go msg from man :%s:' % (myId,msg) ) close(pipe_cli_end) (pmiSocket,pmiAddr) = clientListenSocket.accept() pmiFile = fdopen(pmiSocket.fileno(),'r') msg = mpd_recv_one_line(pmiFile) ## mpd_print(0000, "recvd pmi handshake=:%s:" % msg ) if not msg or msg != 'cmd=pmi_handler\n': # handshake mpd_raise('%d: invalid msg from handler :%s:' % (myRank,msg) ) clientPgmArgs = [clientPgm] + clientPgmArgs environ['PATH'] = environ['MPDMAN_CLI_PATH'] environ['PMI_FD'] = str(pmiSocket.fileno()) environ['PMI_SIZE'] = str(nprocs) environ['PMI_RANK'] = str(myRank) environ['PMI_DEBUG'] = str(0) for envvar in clientPgmEnv: (envkey,envval) = envvar.split('=') environ[envkey] = envval ## mpd_print(0000, 'execing clientPgm=:%s:' % (clientPgm) ) try: execvpe(clientPgm,clientPgmArgs,environ) # client except Exception, errmsg: ## mpd_raise('execvpe failed for client %s; errmsg=:%s:' % (clientPgm,errmsg) ) print '%s: could not run %s; probably executable file not found' % (myId,clientPgm) exit(0) _exit(0) # just in case (does no cleanup) close(pipe_write_cli_stdout) close(pipe_write_cli_stderr) clientStdoutFD = pipe_read_cli_stdout clientStdoutFile = fdopen(clientStdoutFD,'r') socketsToSelect[clientStdoutFD] = 1 clientStderrFD = pipe_read_cli_stderr clientStderrFile = fdopen(clientStderrFD,'r') socketsToSelect[clientStderrFD] = 1 clientListenSocket.close() numWithIO = 2 # stdout and stderr so far waitPids = [clientPid] # connect to the client telling it that we are providing pmi service pmiSocket = mpd_get_inet_socket_and_connect('localhost',clientListenPort) pmiFile = fdopen(pmiSocket.fileno(),'r') mpd_send_one_line(pmiSocket,'cmd=pmi_handler\n') # handshake socketsToSelect[pmiSocket] = 1 # begin setup of stdio tree (parent,lchild,rchild) = mpd_get_ranks_in_binary_tree(myRank,nprocs) spawnedChildSockets = [] childrenStdoutTreeSockets = [] childrenStderrTreeSockets = [] if lchild >= 0: numWithIO += 2 # stdout and stderr from child msgToSend = { 'cmd' : 'info_for_parent_in_tree', 'to_rank' : str(lchild), 'parent_host' : myHost, 'parent_port' : myPort } mpd_send_one_msg(rhsSocket,msgToSend) if rchild >= 0: numWithIO += 2 # stdout and stderr from child msgToSend = { 'cmd' : 'info_for_parent_in_tree', 'to_rank' : str(rchild), 'parent_host' : myHost, 'parent_port' : myPort } mpd_send_one_msg(rhsSocket,msgToSend) if myRank == 0: parentStdoutSocket = stdoutToConSocket parentStderrSocket = stderrToConSocket msgToSend = { 'cmd' : 'jobgo' } mpd_send_one_msg(rhsSocket,msgToSend) else: parentStdoutSocket = 0 parentStderrSocket = 0 if environ.has_key('MPDMAN_RSHIP'): rship = environ['MPDMAN_RSHIP'] # (rshipSocket,rshipPort) = mpd_get_inet_listen_socket('',0) rshipPid = fork() if rshipPid == 0: environ['MPDCP_MSHIP_HOST'] = environ['MPDMAN_MSHIP_HOST'] environ['MPDCP_MSHIP_PORT'] = environ['MPDMAN_MSHIP_PORT'] environ['MPDCP_MSHIP_NPROCS'] = str(nprocs) environ['MPDCP_CLI_PID'] = str(clientPid) try: execvpe(rship,[rship],environ) except Exception, errmsg: # make sure my error msgs get to console dup2(parentStdoutSocket.fileno(),1) # closes fd 1 (stdout) if open dup2(parentStderrSocket.fileno(),2) # closes fd 2 (stderr) if open mpd_raise('execvpe failed for copgm %s; errmsg=:%s:' % (rship,errmsg) ) _exit(0); # do NOT do cleanup # rshipSocket.close() waitPids.append(rshipPid) pmiBarrierInRecvd = 0 holdingPMIBarrierLoop1 = 0 holdingEndBarrierLoop1 = 0 endBarrierDone = 0 numDone = 0 mpdSocket = 0 while not endBarrierDone: if get_sigtype_from_mpd: mpdSocket = mpd_get_inet_socket_and_connect('localhost',mpdPort) msgToSend = { 'cmd' : 'manager_needs_help', 'host' : myHost, 'port' : myPort } mpd_send_one_msg(mpdSocket,msgToSend) msg = mpd_recv_one_msg(mpdSocket) if (not msg.has_key('cmd')) or \ (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')): mpd_raise('%s: failed to recv challenge from rhs; msg=:%s:' % (myId,msg) ) response = new(''.join([mpdConfPasswd,msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'host' : myHost, 'port' : myPort } mpd_send_one_msg(mpdSocket,msgToSend) msg = mpd_recv_one_msg(mpdSocket) if (not msg.has_key('cmd')) or (msg['cmd'] != 'OK_to_send_requests'): mpd_raise('%s: NOT OK to send requests to mpd; msg=:%s:' % (myId,msg) ) msgToSend = { 'cmd' : 'get_signal_to_deliver', 'pid' : `getpid()`,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -