⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpdlib.py

📁 fortran并行计算包
💻 PY
📖 第 1 页 / 共 5 页
字号:
            msg += c        return msg     # The default behavior on an error needs to be to handle and/or report    # it.  Otherwise, we all waste time trying to figure out why     # the code is silently failing.  I've set the default for errprint     # to YES rather than NO.    def send_dict_msg(self,msg,errprint=1):        pickledMsg = dumps(msg)         # FIXME: Does this automatically handle EINTR, or does it need an        # except os.error, errinfo: and check on errinfo[0] == EINTR        try:            while 1:                try:                    self.sendall( "%08d%s" % (len(pickledMsg),pickledMsg) )                    break                except socket.error, errmsg:		    if errmsg[0] == EPIPE  \                    or errmsg[0] == ECONNRESET \                    or errmsg[0] == EINTR:			# silent failure on pipe failure, as we usually                        # just want to discard messages in this case                         # (We need to plan error handling more thoroughly)                        break  ## RMB: chgd from pass                    else:                        raise socket.error, errmsg            # end of While        except Exception, errmsg:            mpd_print_tb(errprint,'send_dict_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))    def send_char_msg(self,msg,errprint=1):        try:            while 1:                try:                    self.sock.sendall(msg)                    break                except socket.error, errmsg:		    if errmsg[0] == EPIPE:			# silent failure on pipe failure, as we usually                        # just want to discard messages in this case                         # (We need to plan error handling more thoroughly)                        pass                    if errmsg[0] != EINTR:                        raise socket.error, errmsg            # end of While        except Exception, errmsg:            mpd_print_tb(errprint,'send_char_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))class MPDListenSock(MPDSock):    def __init__(self,host='',port=0,filename='',listen=5,name='listener',**kargs):        MPDSock.__init__(self,name=name,**kargs)        self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)        if filename:            self.sock.bind(filename)            self.sock.listen(listen)            return        # see if we have a PORT_RANGE environment variable        try:            port_range = os.environ['MPIEXEC_PORT_RANGE']            (low_port, high_port) = map(int, port_range.split(':'))        except:            try:                port_range = os.environ['MPICH_PORT_RANGE']                (low_port, high_port) = map(int, port_range.split(':'))            except:                (low_port,high_port) = (0,0)        if low_port < 0  or  high_port < low_port:            (low_port,high_port) = (0,0)        if low_port != 0  and  high_port != 0:            if port == 0:                port = low_port                while 1:                    try:                        self.sock.bind((host,port))                        self.sock.listen(listen)                        break                    except socket.error, e:                        port += 1                        if port <= high_port:                            self.sock.close()                            MPDSock.__init__(self,name=name,**kargs)                            self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)                            continue                        else:                            mpd_print_tb(1,'** no free ports in MPICH_PORT_RANGE')                            sys.exit(-1)            else:  # else use the explicitly specified port                if port < low_port  or  port > high_port:                    mpd_print_tb(1,'** port %d is outside MPICH_PORT_RANGE' % port)                    sys.exit(-1)                self.sock.bind((host,port))  # go ahead and bind                self.sock.listen(listen)        else:            self.sock.bind((host,port))  # no port range set, so just bind as usual            self.sock.listen(listen)class MPDStreamHandler(object):    def __init__(self):        self.activeStreams = {}    def set_handler(self,stream,handler,args=()):        self.activeStreams[stream] = (handler,args)    def del_handler(self,stream):        if self.activeStreams.has_key(stream):            del self.activeStreams[stream]    def close_all_active_streams(self):        for stream in self.activeStreams.keys():            del self.activeStreams[stream]            stream.close()    def handle_active_streams(self,streams=None,timeout=0.1):        global mpd_signum        while 1:            if streams:                streamsToSelect = streams            else:                streamsToSelect = self.activeStreams.keys()            readyStreams = []            try:                mpd_signum = 0                (readyStreams,u1,u2) = select.select(streamsToSelect,[],[],timeout)                break            except select.error, errinfo:                if errinfo[0] == EINTR:                    if mpd_signum == signal.SIGCHLD:                        break                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:                        break                    else:                        continue                else:                    print '%s: handle_active_streams: select error: %s' % \                          (mpd_my_id,os.strerror(errinfo[0]))                    return (-1,os.strerror(errinfo[0]))            except KeyboardInterrupt, errinfo:                # print 'handle_active_streams: keyboard interrupt during select'                return (-1,errinfo.__class__,errinfo)            except Exception, errinfo:                print 'handle_active_streams: exception during select %s :%s:' % \                      ( errinfo.__class__, errinfo)                return (-1,errinfo.__class__,errinfo)        for stream in readyStreams:            if self.activeStreams.has_key(stream):                (handler,args) = self.activeStreams[stream]                handler(stream,*args)            else:                # this is not nec bad; an active stream (handler) may                # have been deleted by earlier handler in this loop                print '*** OOPS, unknown stream in handle_active_streams'        return (len(readyStreams),0)  #  len >= 0class MPDRing(object):    def __init__(self,listenSock=None,streamHandler=None,secretword='',                 myIfhn='',entryIfhn='',entryPort=0,zcMyLevel=0):        if not streamHandler:            mpd_print(1, "must supply handler for new conns in ring")            sys.exit(-1)        if not listenSock:            mpd_print(1, "must supply listenSock for new ring")            sys.exit(-1)        if not myIfhn:            mpd_print(1, "must supply myIfhn for new ring")            sys.exit(-1)        self.secretword = secretword        self.myIfhn     = myIfhn        self.generation = 0        self.listenSock = listenSock        self.listenPort = self.listenSock.sock.getsockname()[1]        self.streamHandler = streamHandler        self.streamHandler.set_handler(self.listenSock,self.handle_ring_listener_connection)        self.entryIfhn = entryIfhn        self.entryPort = entryPort        self.lhsIfhn = ''        self.lhsPort = 0        self.rhsIfhn = ''        self.rhsPort = 0        self.lhsSock = 0        self.rhsSock = 0        self.lhsHandler = None        self.rhsHandler = None        self.zcMyLevel = zcMyLevel        if self.zcMyLevel:            mpd_init_zc(self.myIfhn,self.zcMyLevel)    def create_single_mem_ring(self,ifhn='',port=0,lhsHandler=None,rhsHandler=None):        self.lhsSock,self.rhsSock = mpd_sockpair()        self.lhsIfhn = ifhn        self.lhsPort = port        self.rhsIfhn = ifhn        self.rhsPort = port        self.lhsHandler = lhsHandler        self.streamHandler.set_handler(self.lhsSock,lhsHandler)        self.rhsHandler = rhsHandler        self.streamHandler.set_handler(self.rhsSock,rhsHandler)    def reenter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=5):        if mpd_zc:            mpd_close_zc()            mpd_init_zc(self.myIfhn,self.zcMyLevel)        rc = -1        numTries = 0	self.generation += 1        while rc < 0  and  numTries < ntries:            numTries += 1            rc = self.enter_ring(entryIfhn=entryIfhn,entryPort=entryPort,                                 lhsHandler=lhsHandler,rhsHandler=rhsHandler,				 ntries=1)	    sleepTime = random() * 1.5  # a single random is between 0 and 1            sleep(sleepTime)        mpd_print(1,'reenter_ring rc=%d after numTries=%d' % (rc,numTries) )        return rc    def enter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=1):        if not lhsHandler  or  not rhsHandler:            print 'missing handler for enter_ring'            sys.exit(-1)        if not entryIfhn:            entryIfhn = self.entryIfhn        if not entryPort:            entryPort = self.entryPort        if not entryIfhn  and  mpd_zc:            if self.zcMyLevel == 1:                (entryHost,entryPort) = ('',0)            else:                (entryIfhn,entryPort) = mpd_find_zc_peer(self.zcMyLevel-1)                if not entryPort:                    print "FAILED TO FIND A PEER AT LEVEL", self.zcMyLevel-1                    sys.exit(-1)            print "ENTRY INFO", (entryIfhn,entryPort)        if not entryIfhn:            self.create_single_mem_ring(ifhn=self.myIfhn,                                        port=self.listenPort,                                        lhsHandler=lhsHandler,                                        rhsHandler=rhsHandler)        else:            rv = self.connect_lhs(lhsIfhn=entryIfhn,                                  lhsPort=entryPort,                                  lhsHandler=lhsHandler,                                  numTries=ntries)            if rv[0] <= 0:  # connect failed with problem                mpd_print(1,"lhs connect failed")                return -1            if rv[1]:  # rhsifhn and rhsport                rhsIfhn = rv[1][0]                rhsPort = rv[1][1]            else:                mpd_print(1,"did not recv rhs host&port from lhs")                return -1            rv = self.connect_rhs(rhsIfhn=rhsIfhn,                                  rhsPort=rhsPort,                                  rhsHandler=rhsHandler,                                  numTries=ntries)            if rv[0] <=  0:  # connect did not succeed; may try again                mpd_print(1,"rhs connect failed")                return -1        if mpd_zc:            mpd_register_zc(self.myIfhn,self.zcMyLevel)        return 0    def connect_lhs(self,lhsIfhn='',lhsPort=0,lhsHandler=None,numTries=1):        if not lhsHandler:            mpd_print(1, "must supply handler for lhs in ring")            return (-1,None)        if not lhsIfhn:            mpd_print(1, "must supply host for lhs in ring")            return (-1,None)        self.lhsIfhn = lhsIfhn        if not lhsPort:            mpd_print(1, "must supply port for lhs in ring")            return (-1,None)        self.lhsPort = lhsPort        numConnTries = 0        while numConnTries < numTries:            numConnTries += 1            self.lhsSock = MPDSock(name='lhs')            try:                self.lhsSock.connect((self.lhsIfhn,self.lhsPort))            except socket.error, errinfo:                print '%s: conn error in connect_lhs: %s' % \                      (mpd_my_id,os.strerror(errinfo[0]))                self.lhsSock.close()                self.lhsSock = 0                sleep(random())                continue            break        if not self.lhsSock  or  numConnTries > numTries:            mpd_print(1,'failed to connect to lhs at %s %d' % (self.lhsIfhn,self.lhsPort))            return (0,None)        msgToSend = { 'cmd' : 'request_to_enter_as_rhs', 'ifhn' : self.myIfhn,                      'port' : self.listenPort,                      'mpd_version' : mpd_version() }        self.lhsSock.send_dict_msg(msgToSend)        msg = self.lhsSock.recv_dict_msg()        if (not msg) \        or (not msg.has_key('cmd')) \        or (not msg['cmd'] == 'challenge') \        or (not msg.has_key('randnum')) \        or (not msg.has_key('generation')):            mpd_print(1,'invalid challenge from %s %d: %s' % \                      (self.lhsIfhn,self.lhsPort,msg) )            return (-1,None)        if msg['generation'] < self.generation:            mpd_print(1,'bad generation from lhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation))            return(-1,'bad_generation')  # RMB: try again here later        response = md5new(''.join([self.secretword,msg['randnum']])).digest()        msgToSend = { 'cmd' : 'challenge_response', 'response' : response,                      'ifhn' : self.myIfhn, 'port' : self.listenPort }        self.lhsSock.send_dict_msg(msgToSend)        msg = self.lhsSock.recv_dict_msg()        if (not msg) \        or (not msg.has_key('cmd')) \        or (not msg['cmd'] == 'OK_to_enter_as_rhs'):            mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords')            return (-1,None)        self.lhsHandler = lhsHandler        self.streamHandler.set_handler(self.lhsSock,lhsHandler)        if msg.has_key('rhsifhn') and msg.has_key('rhsport'):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -