📄 mpdman.py
字号:
parentStderrSocket.close() parentStderrSocket = 0 if myRank == 0 or holdingEndBarrierLoop1: holdingEndBarrierLoop1 = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'} mpd_send_one_msg(rhsSocket,msgToSend) else: if parentStdoutSocket: mpd_send_one_line(parentStdoutSocket,line) # parentStdoutSocket.sendall('FWD by %d: |%s|' % (myRank,line) ) elif readySocket in childrenStderrTreeSockets: line = readySocket.recv(1024) if not line: del socketsToSelect[readySocket] readySocket.close() numDone += 1 if numDone >= numWithIO: if parentStdoutSocket: parentStdoutSocket.close() parentStdoutSocket = 0 if parentStderrSocket: parentStderrSocket.close() parentStderrSocket = 0 if myRank == 0 or holdingEndBarrierLoop1: holdingEndBarrierLoop1 = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'} mpd_send_one_msg(rhsSocket,msgToSend) else: if parentStderrSocket: mpd_send_one_line(parentStderrSocket,line) # parentStdoutSocket.sendall('FWD by %d: |%s|' % (myRank,line) ) elif readySocket in spawnedChildSockets: msg = mpd_recv_one_msg(readySocket) if not msg: del socketsToSelect[readySocket] readySocket.close() elif msg['cmd'] == 'job_started' or msg['cmd'] == 'job_terminated': pass elif msg['cmd'] == 'spawned_childs_kvs': ## NOTE: if you spawn a non-MPI job, it may not send this msg ## in which case the pgm will hang exec('%s = %s' % (msg['kvsname'],msg['kvs'])) # copy remote kvs here exec('default_kvs = %s' % default_kvsname) # prepare to send my kvs msgToSend = { 'cmd' : 'parents_kvs', 'kvsname' : default_kvsname, 'kvs' : default_kvs } mpd_send_one_msg(readySocket,msgToSend) pmiMsgToSend = \ 'cmd=spawn_result status=spawn_done remote_kvsname=%s\n' % \ msg['kvsname'] mpd_send_one_line(pmiSocket,pmiMsgToSend) else: mpd_print(1, "unrecognized msg from spawned child :%s:" % line ) elif readySocket == pmiSocket: line = mpd_recv_one_line(pmiFile) if not line: (donePid,status) = waitpid(clientPid,0) msgToSend = { 'cmd' : 'client_exit_status', 'status' : status, 'id' : myId, 'rank' : myRank } if myRank == 0: mpd_send_one_msg(conSocket,msgToSend) else: mpd_send_one_msg(rhsSocket,msgToSend) del socketsToSelect[pmiSocket] pmiSocket.close() if pmiCollectiveJob: if conSocket: msgToSend = { 'cmd' : 'job_terminated_early', 'jobid' : jobid, 'rank' : myRank } mpd_send_one_msg(conSocket,msgToSend) # conSocket.close() if rhsSocket in socketsToSelect.keys(): # still alive ? msgToSend = { 'cmd' : 'collective_abort', 'src' : myId, 'rank' : myRank} mpd_send_one_msg(rhsSocket,msgToSend) # rhsSocket.close() try: kill(clientPid,SIGKILL) except: pass # client may already be gone else: parsedMsg = parse_pmi_msg(line) if parsedMsg['cmd'] == 'get_my_kvsname': pmiMsgToSend = 'cmd=my_kvsname kvsname=%s\n' % (default_kvsname) mpd_send_one_line(pmiSocket,pmiMsgToSend) elif parsedMsg['cmd'] == 'get_maxes': pmiMsgToSend = 'cmd=maxes kvsname_max=4096 ' + \ 'keylen_max=4096 vallen_max=4096\n' mpd_send_one_line(pmiSocket,pmiMsgToSend) pmiCollectiveJob = 1 # really needs a pmi init elif parsedMsg['cmd'] == 'create_kvs': new_kvsname = kvsname_template + str(kvs_next_id) exec('%s = {}' % (new_kvsname)) kvs_next_id += 1 pmiMsgToSend = 'cmd=newkvs kvsname=%s\n' % (new_kvsname) mpd_send_one_line(pmiSocket,pmiMsgToSend) elif parsedMsg['cmd'] == 'put': kvsname = parsedMsg['kvsname'] key = parsedMsg['key'] value = parsedMsg['value'] cmd = kvsname + '["' + key + '"] = "' + value + '"' try: exec(cmd) pmiMsgToSend = 'cmd=put_result rc=0\n' mpd_send_one_line(pmiSocket,pmiMsgToSend) except Exception, errmsg: pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg mpd_send_one_line(pmiSocket,pmiMsgToSend) elif parsedMsg['cmd'] == 'barrier_in': pmiBarrierInRecvd = 1 if myRank == 0 or holdingPMIBarrierLoop1: msgToSend = { 'cmd' : 'pmi_barrier_loop_1' } mpd_send_one_msg(rhsSocket,msgToSend) if myRank == 0 and spawned: ## NOTE: a non-MPI job might not call the pmi barrier code; ## this may cause the pgm to hang if it does spawns exec('default_kvs = %s' % default_kvsname) default_kvs['rmb_test'] = 'magpie' # just for testing msgToSend = { 'cmd' : 'spawned_childs_kvs', 'kvsname' : default_kvsname, 'kvs' : default_kvs } mpd_send_one_msg(conSocket,msgToSend) msg = mpd_recv_one_msg(conSocket) exec('%s = %s' % (msg['kvsname'],msg['kvs'])) # get parentkvs elif parsedMsg['cmd'] == 'get': kvsname = parsedMsg['kvsname'] key = parsedMsg['key'] cmd = 'value = ' + kvsname + '["' + key + '"]' try: exec(cmd) gotit = 1 except Exception, errmsg: gotit = 0 if gotit: pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (value) mpd_send_one_line(pmiSocket,pmiMsgToSend) else: msgToSend = { 'cmd' : 'pmi_get', 'key' : key, 'kvsname' : kvsname, 'from_rank' : myRank } mpd_send_one_msg(rhsSocket,msgToSend) elif parsedMsg['cmd'] == 'spawn': mpdSocket = mpd_get_inet_socket_and_connect('localhost',mpdPort) msgToSend = { 'cmd' : 'manager_needs_help', 'host' : myHost, 'port' : myPort } mpd_send_one_msg(mpdSocket,msgToSend) msg = mpd_recv_one_msg(mpdSocket) if (not msg.has_key('cmd')) or \ (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')): mpd_raise('%s: failed to recv challenge from rhs; msg=:%s:' \ % (myId,msg) ) response = new(''.join([mpdConfPasswd,msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'host' : myHost, 'port' : myPort } mpd_send_one_msg(mpdSocket,msgToSend) msg = mpd_recv_one_msg(mpdSocket) if (not msg.has_key('cmd')) or \ (msg['cmd'] != 'OK_to_send_requests'): mpd_raise('%s: NOT OK to send requests to mpd; msg=:%s:' \ % (myId,msg) ) nprocs = int(parsedMsg['nprocs']) hosts = { (0,nprocs-1) : '_any_' } execs = { (0,nprocs-1) : parsedMsg['execname'] } users = { (0,nprocs-1) : mpd_get_my_username() } cwds = { (0,nprocs-1) : environ['MPDMAN_CWD'] } paths = { (0,nprocs-1) : '' } args = { (0,nprocs-1) : parsedMsg['arg'] } # fix later to handle # more than 1 arg envvars = { (0,nprocs-1) : '' } msgToSend = { 'cmd' : 'spawn', 'conhost' : gethostname(), 'conport' : myPort, 'spawned' : 1, 'nstarted' : 0, 'nprocs' : nprocs, 'hosts' : hosts, 'execs' : execs, 'users' : users, 'cwds' : cwds, 'paths' : paths, 'args' : args, 'envvars' : envvars } numWithIO += 2 # this proc may produce stdout and stderr mpd_send_one_msg(mpdSocket,msgToSend) # we send a result back to client after exchging kvs with spawnee mpdSocket.close() mpdSocket = 0 elif parsedMsg['cmd'] == 'finalize': pmiCollectiveJob = 0 else: mpd_print(1, "unrecognized pmi msg :%s:" % line ) elif readySocket == conSocket: msg = mpd_recv_one_msg(conSocket) if not msg: del socketsToSelect[conSocket] conSocket.close() conSocket = 0 elif msg['cmd'] == 'signal': if msg['signo'] == 'SIGINT': mpd_send_one_msg(rhsSocket,msg) rhsSocket.close() kill(0,SIGKILL) # pid 0 -> all in my process group _exit(0) elif msg['signo'] == 'SIGTSTP': msg['dest'] = myId mpd_send_one_msg(rhsSocket,msg) kill(clientPid,SIGTSTP) elif msg['signo'] == 'SIGCONT': msg['dest'] = myId mpd_send_one_msg(rhsSocket,msg) kill(clientPid,SIGCONT) else: mpd_print(1, 'unexpected msg recvd on conSocket :%s:' % msg ) elif readySocket == mpdSocket: mpd_print(1, 'unexpected msg recvd on mpdSocket :%s:' % msg ) else: mpd_print(1, 'recvd msg on unknown socket :%s:' % readySocket ) mpd_print(0000, "out of loop") if myRank == 0: if conSocket: # may race with spawner to exit msgToSend = { 'cmd' : 'job_terminated', 'jobid' : jobid } mpd_send_one_msg(conSocket,msgToSend) conSocket.close() # may want to want to wait for waitPids heredef parse_pmi_msg(msg): parsed_msg = {} sm = findall(r'\S+',msg) for e in sm: se = e.split('=') parsed_msg[se[0]] = se[1] return parsed_msgdef sigusr1_handler(signum,frame): global get_sigtype_from_mpd get_sigtype_from_mpd = 1if __name__ == '__main__': if not environ.has_key('MPDMAN_CLI_PGM'): # assume invoked from keyboard print 'mpdman for mpd version: %s' % str(mpd_version) print 'mpdman does NOT run as a console program; should be execd by mpd' exit(-1) try: mpdman() except mpdError, errmsg: print 'mpdman failed; cause: %s' % (errmsg) ## pass
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -