📄 mpdlib.py
字号:
except Exception, errinfo: print '%s: failure doing accept : %s : %s' % \ (mpd_my_id,errinfo.__class__,errinfo) break if newsock: newsock = MPDSock(sock=newsock,name=name) # turn new socket into an MPDSock return (newsock,newaddr) def recv(self,nbytes): global mpd_signum data = 0 while 1: try: mpd_signum = 0 data = self.sock.recv(nbytes) break except socket.error, errinfo: if errinfo[0] == EINTR: # sigchld, sigint, etc. if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: break else: continue elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) break else: print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0])) break except Exception, errinfo: print '%s: failure doing recv %s :%s:' % \ (mpd_my_id,errinfo.__class__,errinfo) break return data def recv_dict_msg(self,timeout=None): global mpd_signum msg = {} readyToRecv = 0 if timeout: try: mpd_signum = 0 (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout) except select.error, errinfo: if errinfo[0] == EINTR: if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: pass # assume timedout; returns {} below else: print '%s: select error: %s' % (mpd_my_id,os.strerror(errinfo[0])) except KeyboardInterrupt, errinfo: # print 'recv_dict_msg: keyboard interrupt during select' return msg except Exception, errinfo: print 'recv_dict_msg: exception during select %s :%s:' % \ ( errinfo.__class__, errinfo) return msg else: readyToRecv = 1 if readyToRecv: try: pickledLen = self.sock.recv(8) if pickledLen: pickledLen = int(pickledLen) pickledMsg = '' lenLeft = pickledLen while lenLeft: recvdMsg = self.sock.recv(lenLeft) pickledMsg += recvdMsg lenLeft -= len(recvdMsg) msg = loads(pickledMsg) except socket.error, errinfo: if errinfo[0] == EINTR: return msg elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) pass # socket.error: (104, 'Connection reset by peer') else: mpd_print_tb(1,'recv_dict_msg: socket error: errinfo=:%s:' % (errinfo)) except StandardError, errmsg: # any built-in exceptions mpd_print_tb(1, 'recv_dict_msg: errmsg=:%s:' % (errmsg) ) except Exception, errmsg: mpd_print_tb(1, 'recv_dict_msg failed on sock %s errmsg=:%s:' % \ (self.name,errmsg) ) return msg def recv_char_msg(self): return self.recv_one_line() # use leading len later def recv_one_line(self): msg = '' try: c = self.sock.recv(1) except socket.error, errinfo: if errinfo[0] == EINTR: # sigchld, sigint, etc. return msg elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) return msg else: print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0])) sys.exit(-1) except Exception, errmsg: c = '' msg = '' mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) ) if c: while c != '\n': msg += c try: c = self.sock.recv(1) except socket.error, errinfo: if errinfo[0] == EINTR: # sigchld, sigint, etc. return msg elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) return msg else: print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0])) sys.exit(-1) except Exception, errmsg: c = '' msg = '' mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) ) break msg += c return msg def send_dict_msg(self,msg,errprint=1): pickledMsg = dumps(msg) try: self.sendall( "%08d%s" % (len(pickledMsg),pickledMsg) ) except Exception, errmsg: if errprint: mpd_print_tb(1, 'send_dict_msg: sock=%s errmsg=:%s:' % (self.name,errmsg) ) def send_char_msg(self,msg,errprint=1): try: self.sock.sendall(msg) except Exception, errmsg: if errprint: mpd_print_tb(1, 'send_char_msg: sock=%s errmsg=:%s:' % (self.name,errmsg) )class MPDListenSock(MPDSock): def __init__(self,host='',port=0,filename='',listen=5,name='listener',**kargs): MPDSock.__init__(self,name=name,**kargs) self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) if filename: self.sock.bind(filename) else: self.sock.bind((host,port)) self.sock.listen(listen)class MPDStreamHandler(object): def __init__(self): self.activeStreams = {} def set_handler(self,stream,handler,args=()): self.activeStreams[stream] = (handler,args) def del_handler(self,stream): if self.activeStreams.has_key(stream): del self.activeStreams[stream] def close_all_active_streams(self): for stream in self.activeStreams.keys(): del self.activeStreams[stream] stream.close() def handle_active_streams(self,streams=None,timeout=0.1): global mpd_signum while 1: if streams: streamsToSelect = streams else: streamsToSelect = self.activeStreams.keys() readyStreams = [] try: mpd_signum = 0 (readyStreams,u1,u2) = select.select(streamsToSelect,[],[],timeout) break except select.error, errinfo: if errinfo[0] == EINTR: if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: break else: continue else: print '%s: handle_active_streams: select error: %s' % \ (mpd_my_id,os.strerror(errinfo[0])) return (-1,os.strerror(errinfo[0])) except KeyboardInterrupt, errinfo: # print 'handle_active_streams: keyboard interrupt during select' return (-1,errinfo.__class__,errinfo) except Exception, errinfo: print 'handle_active_streams: exception during select %s :%s:' % \ ( errinfo.__class__, errinfo) return (-1,errinfo.__class__,errinfo) for stream in readyStreams: if self.activeStreams.has_key(stream): (handler,args) = self.activeStreams[stream] handler(stream,*args) else: # this is not nec bad; an active stream (handler) may # have been deleted by earlier handler in this loop print '*** OOPS, unknown stream in handle_active_streams' return (len(readyStreams),0) # len >= 0class MPDRing(object): def __init__(self,listenSock=None,streamHandler=None,secretword='', myIfhn='',entryIfhn='',entryPort=0): if not streamHandler: mpd_print(1, "must supply handler for new conns in ring") sys.exit(-1) if not listenSock: mpd_print(1, "must supply listenSock for new ring") sys.exit(-1) if not myIfhn: mpd_print(1, "must supply myIfhn for new ring") sys.exit(-1) self.secretword = secretword self.myIfhn = myIfhn self.generation = 0 self.listenSock = listenSock self.listenPort = self.listenSock.sock.getsockname()[1] self.streamHandler = streamHandler self.streamHandler.set_handler(self.listenSock,self.handle_ring_listener_connection) self.entryIfhn = entryIfhn self.entryPort = entryPort self.lhsIfhn = '' self.lhsPort = 0 self.rhsIfhn = '' self.rhsPort = 0 self.lhsSock = 0 self.rhsSock = 0 self.lhsHandler = None self.rhsHandler = None def create_single_mem_ring(self,ifhn='',port=0,lhsHandler=None,rhsHandler=None): self.lhsSock,self.rhsSock = mpd_sockpair() self.lhsIfhn = ifhn self.lhsPort = port self.rhsIfhn = ifhn self.rhsPort = port self.lhsHandler = lhsHandler self.streamHandler.set_handler(self.lhsSock,lhsHandler) self.rhsHandler = rhsHandler self.streamHandler.set_handler(self.rhsSock,rhsHandler) def reenter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=5): rc = -1 numTries = 0 self.generation += 1 while rc < 0 and numTries < ntries: numTries += 1 rc = self.enter_ring(entryIfhn=entryIfhn,entryPort=entryPort, lhsHandler=lhsHandler,rhsHandler=rhsHandler, ntries=1) sleepTime = random() * 1.5 # a single random is between 0 and 1 sleep(sleepTime) mpd_print(1,'reenter_ring rc=%d after numTries=%d' % (rc,numTries) ) return rc def enter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=1): if not lhsHandler or not rhsHandler: print 'missing handler for enter_ring' sys.exit(-1) if not entryIfhn: entryIfhn = self.entryIfhn if not entryPort: entryPort = self.entryPort if not entryIfhn: self.create_single_mem_ring(ifhn=self.myIfhn, port=self.listenPort, lhsHandler=lhsHandler, rhsHandler=rhsHandler) else: rv = self.connect_lhs(lhsIfhn=entryIfhn, lhsPort=entryPort, lhsHandler=lhsHandler, numTries=ntries) if rv[0] <= 0: # connect failed with problem mpd_print(1,"lhs connect failed") return -1 if rv[1]: # rhsifhn and rhsport rhsIfhn = rv[1][0] rhsPort = rv[1][1] else: mpd_print(1,"did not recv rhs host&port from lhs") return -1 rv = self.connect_rhs(rhsIfhn=rhsIfhn, rhsPort=rhsPort, rhsHandler=rhsHandler, numTries=ntries) if rv[0] <= 0: # connect did not succeed; may try again mpd_print(1,"rhs connect failed") return -1 return 0 def connect_lhs(self,lhsIfhn='',lhsPort=0,lhsHandler=None,numTries=1): if not lhsHandler: mpd_print(1, "must supply handler for lhs in ring") return (-1,None) if not lhsIfhn: mpd_print(1, "must supply host for lhs in ring") return (-1,None) self.lhsIfhn = lhsIfhn if not lhsPort: mpd_print(1, "must supply port for lhs in ring") return (-1,None) self.lhsPort = lhsPort numConnTries = 0 while numConnTries < numTries: numConnTries += 1 self.lhsSock = MPDSock(name='lhs') try: self.lhsSock.connect((self.lhsIfhn,self.lhsPort)) except socket.error, errinfo: print '%s: conn error in connect_lhs: %s' % \ (mpd_my_id,os.strerror(errinfo[0])) self.lhsSock.close() self.lhsSock = 0
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -