📄 mpdman.py
字号:
'jobid' : jobid } mpd_send_one_msg(mpdSocket,msgToSend) msg = mpd_recv_one_msg(mpdSocket) if msg['sigtype'].isdigit(): signum = int(msg['sigtype']) else: import signal as tmpimp # just to get valid SIG's exec('signum = %s' % 'tmpimp.SIG' + msg['sigtype']) try: kill(clientPid,signum) except Exception, errmsg: mpd_print(1, 'invalid signal %d' % (signum) ) mpdSocket.close() mpdSocket = 0 get_sigtype_from_mpd = 0 try: (inReadySockets,None,None) = select(socketsToSelect.keys(),[],[],30) except error, errmsg: if isinstance(errmsg,Exception) and errmsg[0] == 4: # interrupted system call continue else: mpd_raise('%s: select failed: errmsg=:%s:' % (myId,errmsg) ) for readySocket in inReadySockets: if readySocket not in socketsToSelect.keys(): continue if readySocket == listenSocket: (tempSocket,tempConnAddr) = listenSocket.accept() msg = mpd_recv_one_msg(tempSocket) if msg and msg.has_key('cmd'): if msg['cmd'] == 'child_in_stdout_tree': socketsToSelect[tempSocket] = 1 childrenStdoutTreeSockets.append(tempSocket) elif msg['cmd'] == 'child_in_stderr_tree': socketsToSelect[tempSocket] = 1 childrenStderrTreeSockets.append(tempSocket) elif msg['cmd'] == 'spawned_child': socketsToSelect[tempSocket] = 1 spawnedChildSockets.append(tempSocket) else: mpd_print(1, 'unknown msg recvd on listenSocket :%s:' % (msg) ) elif readySocket == lhsSocket: msg = mpd_recv_one_msg(lhsSocket) if not msg: mpd_print(1, 'lhs died' ) del socketsToSelect[lhsSocket] lhsSocket.close() elif msg['cmd'] == 'jobgo': if myRank == 0: msgToSend = { 'cmd' : 'job_started', 'jobid' : jobid } mpd_send_one_msg(conSocket,msgToSend) else: mpd_send_one_msg(rhsSocket,msg) # forward it on write(pipe_man_end,'go') close(pipe_man_end) elif msg['cmd'] == 'info_for_parent_in_tree': if int(msg['to_rank']) == myRank: parentHost = msg['parent_host'] parentPort = msg['parent_port'] parentStdoutSocket = \ mpd_get_inet_socket_and_connect(parentHost,parentPort) msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : myRank } mpd_send_one_msg(parentStdoutSocket,msgToSend) parentStderrSocket = \ mpd_get_inet_socket_and_connect(parentHost,parentPort) msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : myRank } mpd_send_one_msg(parentStderrSocket,msgToSend) else: mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'end_barrier_loop_1': if myRank == 0: msgToSend = { 'cmd' : 'end_barrier_loop_2' } mpd_send_one_msg(rhsSocket,msgToSend) else: if numDone >= numWithIO: mpd_send_one_msg(rhsSocket,msg) else: holdingEndBarrierLoop1 = 1 elif msg['cmd'] == 'end_barrier_loop_2': endBarrierDone = 1 if myRank != 0: mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'pmi_barrier_loop_1': if myRank == 0: msgToSend = { 'cmd' : 'pmi_barrier_loop_2' } mpd_send_one_msg(rhsSocket,msgToSend) pmiMsgToSend = 'cmd=barrier_out\n' mpd_send_one_line(pmiSocket,pmiMsgToSend) else: holdingPMIBarrierLoop1 = 1 if pmiBarrierInRecvd: mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'pmi_barrier_loop_2': pmiBarrierInRecvd = 0 holdingPMIBarrierLoop1 = 0 if myRank != 0: pmiMsgToSend = 'cmd=barrier_out\n' mpd_send_one_line(pmiSocket,pmiMsgToSend) mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'pmi_get': if msg['from_rank'] == myRank: pmiMsgToSend = 'cmd=get_result rc=-1 msg="%s"\n' % msg['key'] mpd_send_one_line(pmiSocket,pmiMsgToSend) else: kvsname = msg['kvsname'] key = msg['key'] cmd = 'value = ' + kvsname + '["' + key + '"]' try: exec(cmd) gotit = 1 except Exception, errmsg: gotit = 0 if gotit: msgToSend = { 'cmd' : 'pmi_get_response', 'value' : value, 'to_rank' : msg['from_rank'] } mpd_send_one_msg(rhsSocket,msgToSend) else: mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'pmi_get_response': if msg['to_rank'] == myRank: pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (msg['value']) mpd_send_one_line(pmiSocket,pmiMsgToSend) else: mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'signal': if msg['signo'] == 'SIGINT': if conSocket: msgToSend = { 'cmd' : 'job_terminated_early', 'jobid' : jobid, 'id' : myId } mpd_send_one_msg(conSocket,msgToSend) conSocket.close() if rhsSocket in socketsToSelect.keys(): # still alive ? mpd_send_one_msg(rhsSocket,msg) rhsSocket.close() kill(0,SIGKILL) # pid 0 -> all in my process group _exit(0) elif msg['signo'] == 'SIGTSTP': if msg['dest'] != myId: mpd_send_one_msg(rhsSocket,msg) kill(clientPid,SIGTSTP) elif msg['signo'] == 'SIGCONT': if msg['dest'] != myId: mpd_send_one_msg(rhsSocket,msg) kill(clientPid,SIGCONT) elif msg['cmd'] == 'client_exit_status': if myRank == 0: mpd_send_one_msg(conSocket,msg) else: mpd_send_one_msg(rhsSocket,msg) elif msg['cmd'] == 'collective_abort': if msg['src'] != myId: if conSocket: msgToSend = { 'cmd' : 'job_terminated_early', 'jobid' : jobid, 'rank' : msg['rank'] } mpd_send_one_msg(conSocket,msgToSend) # conSocket.close() if rhsSocket in socketsToSelect.keys(): # still alive ? mpd_send_one_msg(rhsSocket,msg) # rhsSocket.close() try: kill(clientPid,SIGKILL) except: pass # client may already be gone else: mpd_print(1, 'unexpected msg recvd on lhsSocket :%s:' % msg ) elif readySocket == rhsSocket: msg = mpd_recv_one_msg(rhsSocket) mpd_print(0000, 'rhs died' ) del socketsToSelect[rhsSocket] rhsSocket.close() elif readySocket == clientStdoutFD: line = read(clientStdoutFD,1024) # line = clientStdoutFile.readline() if not line: del socketsToSelect[clientStdoutFD] close(clientStdoutFD) numDone += 1 if numDone >= numWithIO: if parentStdoutSocket: parentStdoutSocket.close() parentStdoutSocket = 0 if parentStderrSocket: parentStderrSocket.close() parentStderrSocket = 0 if myRank == 0 or holdingEndBarrierLoop1: holdingEndBarrierLoop1 = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'} mpd_send_one_msg(rhsSocket,msgToSend) else: if parentStdoutSocket: if lineLabels: splitLine = line.split('\n',1024) if startStdoutLineLabel: line = myLineLabel else: line = '' if splitLine[-1] == '': startStdoutLineLabel = 1 del splitLine[-1] else: startStdoutLineLabel = 0 for s in splitLine[0:-1]: line = line + s + '\n' + myLineLabel line = line + splitLine[-1] if startStdoutLineLabel: line = line + '\n' mpd_send_one_line(parentStdoutSocket,line) # parentStdoutSocket.sendall('STDOUT by %d: |%s|' % (myRank,line) ) elif readySocket == clientStderrFD: line = read(clientStderrFD,1024) # line = clientStderrFile.readline() if not line: del socketsToSelect[clientStderrFD] close(clientStderrFD) numDone += 1 if numDone >= numWithIO: if parentStdoutSocket: parentStdoutSocket.close() parentStdoutSocket = 0 if parentStderrSocket: parentStderrSocket.close() parentStderrSocket = 0 if myRank == 0 or holdingEndBarrierLoop1: holdingEndBarrierLoop1 = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'} mpd_send_one_msg(rhsSocket,msgToSend) else: if parentStderrSocket: if lineLabels: splitLine = line.split('\n',1024) if startStderrLineLabel: line = myLineLabel else: line = '' if splitLine[-1] == '': startStderrLineLabel = 1 del splitLine[-1] else: startStderrLineLabel = 0 for s in splitLine[0:-1]: line = line + s + '\n' + myLineLabel line = line + splitLine[-1] if startStderrLineLabel: line = line + '\n' mpd_send_one_line(parentStderrSocket,line) elif readySocket in childrenStdoutTreeSockets: line = readySocket.recv(1024) if not line: del socketsToSelect[readySocket] readySocket.close() numDone += 1 if numDone >= numWithIO: if parentStdoutSocket: parentStdoutSocket.close() parentStdoutSocket = 0 if parentStderrSocket:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -