📄 mpdstdcp.py
字号:
#!/usr/bin/env pythonimport os, re, socket, selectfrom mpdlib import *def mpdstdcp(): if os.environ.has_key('MPDCP_AM_MSHIP'): mpd_set_my_id(socket.gethostname() + '_copgm_mship_' + `os.getpid()`) mpd_print(0000, "BECOMING mothership" ) mshipPort = int(os.environ['MPDCP_MSHIP_PORT']) mshipFD = int(os.environ['MPDCP_MSHIP_FD']) mshipNProcs = int(os.environ['MPDCP_MSHIP_NPROCS']) mshipSocket = socket.fromfd(mshipFD,socket.AF_INET,socket.SOCK_STREAM) numDone = 0 socketsToSelect = { mshipSocket : 1 } while numDone < 1: # just one satellite connected to me (readySockets,None,None) = select.select(socketsToSelect.keys(),[],[],30) for readySocket in readySockets: if readySocket == mshipSocket: (newSocket,newConnAddr) = mshipSocket.accept() socketsToSelect[newSocket] = 1 # print "MS: accepted from ", newConnAddr else: msg = readySocket.recv(1024) if not msg: del socketsToSelect[readySocket] numDone += 1 else: print msg, # print 'MS: %s' % (msg.strip()) # print 'ENDING mothership' else: mpd_set_my_id(socket.gethostname() + '_copgm_satellite_' + `os.getpid()`) socketsToSelect = {} listenFD = int(os.environ['MPDCP_MY_LISTEN_FD']) listenSocket = socket.fromfd(listenFD,socket.AF_INET,socket.SOCK_STREAM) (manSocket,manConnAddr) = listenSocket.accept() socketsToSelect[manSocket] = 1 myRank = int(os.environ['MPDCP_RANK']) mpd_set_my_id(socket.gethostname() + '_copgm_sat_%d_' % (myRank) + `os.getpid()`) if myRank == 0: mshipHost = os.environ['MPDCP_MSHIP_HOST'] mshipPort = int(os.environ['MPDCP_MSHIP_PORT']) mpd_print(0000, "contacting mship on %s %d" % (mshipHost,mshipPort) ) mshipSocket = mpd_get_inet_socket_and_connect(mshipHost,mshipPort) nprocs = int(os.environ['MPDCP_NPROCS']) # setup ring for PMI if myRank == 0: lhsRank = nprocs - 1 else: lhsRank = myRank - 1 msgToSend = { 'cmd' : 'get_copgm_listen_info', 'to_rank' : str(lhsRank), 'from_rank' : str(myRank) } mpd_send_one_msg(manSocket,msgToSend) msg = mpd_recv_one_msg(manSocket) if not msg or (not msg.has_key('cmd')) or (msg['cmd'] != 'copgm_listen_info'): mpd_print(1, 'failed to recv copgm_listen_info msg=:%s:') sys.exit(0) lhsHost = msg['host'] lhsPort = int(msg['port']) lhsSocket = mpd_get_inet_socket_and_connect(lhsHost,lhsPort) socketsToSelect[lhsSocket] = 1 msgToSend = { 'cmd' : 'new_rhs', 'from_rank' : str(myRank) } mpd_send_one_msg(lhsSocket,msgToSend) (rhsSocket,rhsConnAddr) = listenSocket.accept() msg = mpd_recv_one_msg(rhsSocket) if msg and msg.has_key('cmd') and msg['cmd'] == 'new_rhs' \ and msg.has_key('from_rank'): if (int(msg['from_rank']) == (myRank+1)) \ or (int(msg['from_rank']) == 0 and myRank == (nprocs-1)): socketsToSelect[rhsSocket] = 1 # and connect to the client telling it that we are providing # pmi service clientListenPort = int(os.environ['MPDCP_CLI_LISTEN_PORT']) pmiSocket = mpd_get_inet_socket_and_connect('localhost',clientListenPort) pmiFile = os.fdopen(pmiSocket.fileno(),'r') pmiSocket.sendall('cmd=pmi_handler\n') # handshake socketsToSelect[pmiSocket] = 1 default_kvs = {} # setup stdio tree (parent,lchild,rchild) = mpd_get_ranks_in_binary_tree(myRank,nprocs) lchildSocket = 0 rchildSocket = 0 if parent >= 0: msgToSend = { 'cmd' : 'get_copgm_listen_info', 'to_rank' : str(parent), 'from_rank' : str(myRank) } mpd_send_one_msg(manSocket,msgToSend) msg = mpd_recv_one_msg(manSocket) if not msg or (not msg.has_key('cmd')) or (msg['cmd'] != 'copgm_listen_info'): mpd_print(1, 'failed to recv copgm_listen_info msg=:%s:') sys.exit(0) parentHost = msg['host'] parentPort = int(msg['port']) parentSocket = mpd_get_inet_socket_and_connect(parentHost,parentPort) msgToSend = { 'cmd' : 'child_in_tree', 'from_rank' : str(myRank) } mpd_send_one_msg(parentSocket,msgToSend) else: parentSocket = mshipSocket numToDo = 0 numToAccept = len([x for x in [lchild,rchild] if x > 0]) for i in range(numToAccept): (tempSocket,tempConnAddr) = listenSocket.accept() msg = mpd_recv_one_msg(tempSocket) if msg and msg.has_key('cmd') and msg['cmd'] == 'child_in_tree' \ and msg.has_key('from_rank'): if int(msg['from_rank']) == lchild: (lchildSocket,lchildConnAddr) = (tempSocket,tempConnAddr) socketsToSelect[lchildSocket] = 1 lchildFile = os.fdopen(lchildSocket.fileno(),'r') numToDo += 1 elif int(msg['from_rank']) == rchild: (rchildSocket,rchildConnAddr) = (tempSocket,tempConnAddr) socketsToSelect[rchildSocket] = 1 rchildFile = os.fdopen(rchildSocket.fileno(),'r') numToDo += 1 else: mpd_print(1, 'bad rank in child_in_tree msg %s' % msg['from_rank']) else: mpd_print(1, 'invalid msg recvd for child_in_tree :%s:' % msg ) clientStdoutFD = int(os.environ['MPDCP_CLI_STDOUT_FD']) clientStdoutFile = os.fdopen(clientStdoutFD,'r') socketsToSelect[clientStdoutFD] = 1 numToDo += 1 pmiBarrierInRecvd = 0 pmiBarrierLoop1Recvd = 0 endBarrierLoop1Recvd = 0 endBarrierDone = 0 numDone = 0 while not endBarrierDone: (inReadySockets,None,None) = select.select(socketsToSelect.keys(),[],[],30) for readySocket in inReadySockets: if readySocket not in socketsToSelect.keys(): continue if readySocket == clientStdoutFD: line = clientStdoutFile.readline() if not line: del socketsToSelect[clientStdoutFD] os.close(clientStdoutFD) numDone += 1 if numDone >= numToDo: if parentSocket: parentSocket.close() parentSocket = 0 if myRank == 0 or endBarrierLoop1Recvd: endBarrierLoop1Recvd = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'} mpd_send_one_msg(rhsSocket,msgToSend) else: # parentSocket.sendall('STDOUT by %d: |%s|' % (myRank,line) ) parentSocket.sendall(line) elif readySocket == lchildSocket: line = lchildFile.readline() if not line: del socketsToSelect[lchildSocket] lchildSocket.close() numDone += 1 if numDone >= numToDo: if parentSocket: parentSocket.close() parentSocket = 0 if myRank == 0 or endBarrierLoop1Recvd: endBarrierLoop1Recvd = 0 msgToSend = {'cmd' : 'end_barrier_loop_1'}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -