📄 mpdlib.py
字号:
msg += c return msg # The default behavior on an error needs to be to handle and/or report # it. Otherwise, we all waste time trying to figure out why # the code is silently failing. I've set the default for errprint # to YES rather than NO. def send_dict_msg(self,msg,errprint=1): pickledMsg = dumps(msg) # FIXME: Does this automatically handle EINTR, or does it need an # except os.error, errinfo: and check on errinfo[0] == EINTR try: while 1: try: self.sendall( "%08d%s" % (len(pickledMsg),pickledMsg) ) break except socket.error, errmsg: if errmsg[0] == EPIPE \ or errmsg[0] == ECONNRESET \ or errmsg[0] == EINTR: # silent failure on pipe failure, as we usually # just want to discard messages in this case # (We need to plan error handling more thoroughly) break ## RMB: chgd from pass else: raise socket.error, errmsg # end of While except Exception, errmsg: mpd_print_tb(errprint,'send_dict_msg: sock=%s errmsg=:%s:' % (self.name,errmsg)) def send_char_msg(self,msg,errprint=1): try: while 1: try: self.sock.sendall(msg) break except socket.error, errmsg: if errmsg[0] == EPIPE: # silent failure on pipe failure, as we usually # just want to discard messages in this case # (We need to plan error handling more thoroughly) pass if errmsg[0] != EINTR: raise socket.error, errmsg # end of While except Exception, errmsg: mpd_print_tb(errprint,'send_char_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))class MPDListenSock(MPDSock): def __init__(self,host='',port=0,filename='',listen=5,name='listener',**kargs): MPDSock.__init__(self,name=name,**kargs) self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) if filename: self.sock.bind(filename) self.sock.listen(listen) return # see if we have a PORT_RANGE environment variable try: port_range = os.environ['MPIEXEC_PORT_RANGE'] (low_port, high_port) = map(int, port_range.split(':')) except: try: port_range = os.environ['MPICH_PORT_RANGE'] (low_port, high_port) = map(int, port_range.split(':')) except: (low_port,high_port) = (0,0) if low_port < 0 or high_port < low_port: (low_port,high_port) = (0,0) if low_port != 0 and high_port != 0: if port == 0: port = low_port while 1: try: self.sock.bind((host,port)) self.sock.listen(listen) break except socket.error, e: port += 1 if port <= high_port: self.sock.close() MPDSock.__init__(self,name=name,**kargs) self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) continue else: mpd_print_tb(1,'** no free ports in MPICH_PORT_RANGE') sys.exit(-1) else: # else use the explicitly specified port if port < low_port or port > high_port: mpd_print_tb(1,'** port %d is outside MPICH_PORT_RANGE' % port) sys.exit(-1) self.sock.bind((host,port)) # go ahead and bind self.sock.listen(listen) else: self.sock.bind((host,port)) # no port range set, so just bind as usual self.sock.listen(listen)class MPDStreamHandler(object): def __init__(self): self.activeStreams = {} def set_handler(self,stream,handler,args=()): self.activeStreams[stream] = (handler,args) def del_handler(self,stream): if self.activeStreams.has_key(stream): del self.activeStreams[stream] def close_all_active_streams(self): for stream in self.activeStreams.keys(): del self.activeStreams[stream] stream.close() def handle_active_streams(self,streams=None,timeout=0.1): global mpd_signum while 1: if streams: streamsToSelect = streams else: streamsToSelect = self.activeStreams.keys() readyStreams = [] try: mpd_signum = 0 (readyStreams,u1,u2) = select.select(streamsToSelect,[],[],timeout) break except select.error, errinfo: if errinfo[0] == EINTR: if mpd_signum == signal.SIGCHLD: break if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: break else: continue else: print '%s: handle_active_streams: select error: %s' % \ (mpd_my_id,os.strerror(errinfo[0])) return (-1,os.strerror(errinfo[0])) except KeyboardInterrupt, errinfo: # print 'handle_active_streams: keyboard interrupt during select' return (-1,errinfo.__class__,errinfo) except Exception, errinfo: print 'handle_active_streams: exception during select %s :%s:' % \ ( errinfo.__class__, errinfo) return (-1,errinfo.__class__,errinfo) for stream in readyStreams: if self.activeStreams.has_key(stream): (handler,args) = self.activeStreams[stream] handler(stream,*args) else: # this is not nec bad; an active stream (handler) may # have been deleted by earlier handler in this loop print '*** OOPS, unknown stream in handle_active_streams' return (len(readyStreams),0) # len >= 0class MPDRing(object): def __init__(self,listenSock=None,streamHandler=None,secretword='', myIfhn='',entryIfhn='',entryPort=0,zcMyLevel=0): if not streamHandler: mpd_print(1, "must supply handler for new conns in ring") sys.exit(-1) if not listenSock: mpd_print(1, "must supply listenSock for new ring") sys.exit(-1) if not myIfhn: mpd_print(1, "must supply myIfhn for new ring") sys.exit(-1) self.secretword = secretword self.myIfhn = myIfhn self.generation = 0 self.listenSock = listenSock self.listenPort = self.listenSock.sock.getsockname()[1] self.streamHandler = streamHandler self.streamHandler.set_handler(self.listenSock,self.handle_ring_listener_connection) self.entryIfhn = entryIfhn self.entryPort = entryPort self.lhsIfhn = '' self.lhsPort = 0 self.rhsIfhn = '' self.rhsPort = 0 self.lhsSock = 0 self.rhsSock = 0 self.lhsHandler = None self.rhsHandler = None self.zcMyLevel = zcMyLevel if self.zcMyLevel: mpd_init_zc(self.myIfhn,self.zcMyLevel) def create_single_mem_ring(self,ifhn='',port=0,lhsHandler=None,rhsHandler=None): self.lhsSock,self.rhsSock = mpd_sockpair() self.lhsIfhn = ifhn self.lhsPort = port self.rhsIfhn = ifhn self.rhsPort = port self.lhsHandler = lhsHandler self.streamHandler.set_handler(self.lhsSock,lhsHandler) self.rhsHandler = rhsHandler self.streamHandler.set_handler(self.rhsSock,rhsHandler) def reenter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=5): if mpd_zc: mpd_close_zc() mpd_init_zc(self.myIfhn,self.zcMyLevel) rc = -1 numTries = 0 self.generation += 1 while rc < 0 and numTries < ntries: numTries += 1 rc = self.enter_ring(entryIfhn=entryIfhn,entryPort=entryPort, lhsHandler=lhsHandler,rhsHandler=rhsHandler, ntries=1) sleepTime = random() * 1.5 # a single random is between 0 and 1 sleep(sleepTime) mpd_print(1,'reenter_ring rc=%d after numTries=%d' % (rc,numTries) ) return rc def enter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=1): if not lhsHandler or not rhsHandler: print 'missing handler for enter_ring' sys.exit(-1) if not entryIfhn: entryIfhn = self.entryIfhn if not entryPort: entryPort = self.entryPort if not entryIfhn and mpd_zc: if self.zcMyLevel == 1: (entryHost,entryPort) = ('',0) else: (entryIfhn,entryPort) = mpd_find_zc_peer(self.zcMyLevel-1) if not entryPort: print "FAILED TO FIND A PEER AT LEVEL", self.zcMyLevel-1 sys.exit(-1) print "ENTRY INFO", (entryIfhn,entryPort) if not entryIfhn: self.create_single_mem_ring(ifhn=self.myIfhn, port=self.listenPort, lhsHandler=lhsHandler, rhsHandler=rhsHandler) else: rv = self.connect_lhs(lhsIfhn=entryIfhn, lhsPort=entryPort, lhsHandler=lhsHandler, numTries=ntries) if rv[0] <= 0: # connect failed with problem mpd_print(1,"lhs connect failed") return -1 if rv[1]: # rhsifhn and rhsport rhsIfhn = rv[1][0] rhsPort = rv[1][1] else: mpd_print(1,"did not recv rhs host&port from lhs") return -1 rv = self.connect_rhs(rhsIfhn=rhsIfhn, rhsPort=rhsPort, rhsHandler=rhsHandler, numTries=ntries) if rv[0] <= 0: # connect did not succeed; may try again mpd_print(1,"rhs connect failed") return -1 if mpd_zc: mpd_register_zc(self.myIfhn,self.zcMyLevel) return 0 def connect_lhs(self,lhsIfhn='',lhsPort=0,lhsHandler=None,numTries=1): if not lhsHandler: mpd_print(1, "must supply handler for lhs in ring") return (-1,None) if not lhsIfhn: mpd_print(1, "must supply host for lhs in ring") return (-1,None) self.lhsIfhn = lhsIfhn if not lhsPort: mpd_print(1, "must supply port for lhs in ring") return (-1,None) self.lhsPort = lhsPort numConnTries = 0 while numConnTries < numTries: numConnTries += 1 self.lhsSock = MPDSock(name='lhs') try: self.lhsSock.connect((self.lhsIfhn,self.lhsPort)) except socket.error, errinfo: print '%s: conn error in connect_lhs: %s' % \ (mpd_my_id,os.strerror(errinfo[0])) self.lhsSock.close() self.lhsSock = 0 sleep(random()) continue break if not self.lhsSock or numConnTries > numTries: mpd_print(1,'failed to connect to lhs at %s %d' % (self.lhsIfhn,self.lhsPort)) return (0,None) msgToSend = { 'cmd' : 'request_to_enter_as_rhs', 'ifhn' : self.myIfhn, 'port' : self.listenPort, 'mpd_version' : mpd_version() } self.lhsSock.send_dict_msg(msgToSend) msg = self.lhsSock.recv_dict_msg() if (not msg) \ or (not msg.has_key('cmd')) \ or (not msg['cmd'] == 'challenge') \ or (not msg.has_key('randnum')) \ or (not msg.has_key('generation')): mpd_print(1,'invalid challenge from %s %d: %s' % \ (self.lhsIfhn,self.lhsPort,msg) ) return (-1,None) if msg['generation'] < self.generation: mpd_print(1,'bad generation from lhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation)) return(-1,'bad_generation') # RMB: try again here later response = md5new(''.join([self.secretword,msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'ifhn' : self.myIfhn, 'port' : self.listenPort } self.lhsSock.send_dict_msg(msgToSend) msg = self.lhsSock.recv_dict_msg() if (not msg) \ or (not msg.has_key('cmd')) \ or (not msg['cmd'] == 'OK_to_enter_as_rhs'): mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords') return (-1,None) self.lhsHandler = lhsHandler self.streamHandler.set_handler(self.lhsSock,lhsHandler) if msg.has_key('rhsifhn') and msg.has_key('rhsport'):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -