⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.py

📁 基于DHT的对等协议
💻 PY
📖 第 1 页 / 共 2 页
字号:
    if msg.body.mtype == RPCProto.REPLY:        raise UnpackException("Expected call, but got reply")    call = msg.body.cbody    check(RPCProto.RPC_VERSION, call.rpcvers, "RPC version",          lambda: pack_reply(msg.xid,                             RPCProto.MSG_DENIED,                             RPCProto.RPC_MISMATCH,                             RPCProto.RPC_VERSION,                             RPCProto.RPC_VERSION).get_buffer())    check(myprog, call.prog, "program",          lambda: pack_reply(msg.xid,                             RPCProto.MSG_ACCEPTED,                             NULL_AUTH,                             RPCProto.PROG_UNAVAIL).get_buffer())    check(myvers, call.vers, "version",          lambda: pack_reply(msg.xid,                             RPCProto.MSG_ACCEPTED,                             NULL_AUTH,                             RPCProto.PROG_MISMATCH,                             myvers,                             myvers).get_buffer())    check(mycred, call.cred, "cred",          lambda: pack_reply(msg.xid,                             RPCProto.MSG_DENIED,                             RPCProto.AUTH_ERROR,                             RPCProto.AUTH_BADCRED).get_buffer())    check(myverf, call.verf, "verf",          lambda: pack_reply(msg.xid,                             RPCProto.MSG_DENIED,                             RPCProto.AUTH_ERROR,                             RPCProto.AUTH_BADVERF).get_buffer())    return (msg.xid, call.prog, call.vers,            call.proc, call.cred, call.verf, u)class ReuseTCPServer(ThreadingTCPServer):    def __init__(self, addr, handler):        self.allow_reuse_address = 1        ThreadingTCPServer.__init__(self, addr, handler)class Server:    def __init__(self, module, PROG, VERS, port, handlers, name=None):        """If name is not None, Server prints debug messages        prefixed by name."""        assert module is not None        assert 'programs' in dir(module)        assert PROG in module.programs        assert VERS in module.programs[PROG]        for proc in handlers:            assert proc in module.programs[PROG][VERS]        import SocketServer        class StreamHandler(SocketServer.StreamRequestHandler):            def dispatch(self, request):                xid, prog, vers, proc, cred, verf, u = unpack_call(                    request, myprog=PROG, myvers=VERS)                if proc in handlers:                    if name:                        print name + ": Got proc", proc                    arg = module.programs[PROG][VERS][proc].unpack_arg(u)                    res = handlers[proc](xid, cred, verf, arg)                    p = pack_reply(xid, MSG_ACCEPTED, NULL_AUTH, SUCCESS)                    module.programs[PROG][VERS][proc].pack_res(p, res)                    return p.get_buffer()                else:                    # no such procedure                    if name:                        print name + ": Got unknown proc", proc                    return pack_reply(xid, MSG_ACCEPTED, NULL_AUTH,                                      PROC_UNAVAIL).get_buffer()            def handle(self):                rfile = self.request.makefile('rb', -1)                wfile = self.request.makefile('wb', -1)                if name:                    print name + ": Got connection from", self.client_address                while 1:                    try:                        request = readfrags(rfile.read)                        reply = self.dispatch(request)                        writefrags(reply, wfile.write)                        wfile.flush()                    except EOFError:                        return                    except UnpackException, e:                        if name:                            print name + ":", e                        return                    except ReplyException, e:                        if name:                            print name + ":", e                        writefrags(e.reply, wfile.write)                        wfile.flush()                    except:                        if name:                            print name + ": Unexpected error:"                            print_exc()                        return # avoid killing the server        self.handler = StreamHandler        self.port = port    def run(self):        server = ReuseTCPServer(('', self.port), self.handler)        server.serve_forever() # support UDP?        class XidGenerator:    # FIXME: should this use locks?    def __init__(self):        import random, sys        self.xid = random.randint(0, sys.maxint/2)    # FIXME: should this randomize xids?    def next(self):        self.xid += 1        return self.xidclass ClientBase:    def __init__(self, module, PROG, VERS, host, port):        assert module is not None        assert 'programs' in dir(module)        assert PROG in module.programs        assert VERS in module.programs[PROG]        self.PROG = PROG        self.VERS = VERS        self.module = module        self.xidgen = XidGenerator()        self.host = host        self.port = port    def start_connect(self, host, port, cb = None):         raise AssertionError, "This method must be overridden."         def __call__(self, pnum, arg, cb = None):        """Call proc number pnum with arg.           If answer is immediately available, it will be returned.           Otherwise, None is returned, and cb will be called later."""        raise AssertionError, "This method must be overridden."class SClient(ClientBase):    def start_connect(self, cb = None):        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        self.sock.connect((self.host, self.port))        self.rfile = self.sock.makefile('rb', -1)        self.wfile = self.sock.makefile('wb', -1)        return self.sock        # TODO: use exec to define methods on this object for each proc        # in programs[PROG][VERS].  This requires that each generated        # Procedure() object include the proc name as a string.        # Probably a good idea, and for the Program and Version objects,        # too.  Also, rename all these to RPCProcedure, RPCProgram,        # RPCVersion.    def write_request(self, request):        writefrags(request, self.wfile.write)        self.wfile.flush()    def read_reply(self):        return readfrags(self.rfile.read)    def __call__(self, pnum, arg, cb = None):        proc = self.module.programs[self.PROG][self.VERS][pnum]        p = pack_call(self.xidgen.next(), self.PROG, self.VERS, pnum)        proc.pack_arg(p, arg)        request = p.get_buffer()        self.write_request(request)        reply = self.read_reply()        u = unpack_reply(reply)[-1]        return proc.unpack_res(u)class strbuf:    """Unlike stringio, always append to the end of string, read from front."""    def __init__(self): self.s = ''    def write(self, s): self.s += s    def read(self, n):        # Slicing past the end of string returns '', working out well.        v = self.s[:n]        self.s = self.s[n:]        return vclass AClient(ClientBase,asynchat.async_chat):    def __init__(self, module, PROG, VERS, host, port):        # A table of callbacks to be called, keyed by xid.        self.xidcbmap = {}        self.inbuffer = ''        self.fragments = []        self.bytesleft = 0 # until end of current fragment.        asynchat.async_chat.__init__(self)        ClientBase.__init__(self, module, PROG, VERS, host, port)    def handle_connect(self):        err = self.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)        if err == errno.ECONNREFUSED:            self.connect_cb(None)        else:            self.connect_cb(self)        def start_connect(self, cb = None):        if cb is None:            raise TypeError, "Must pass cb to async client"        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)        self.connect_cb = cb        self.connect((self.host, self.port))        self.set_terminator(None)        return None    def handle_reply(self):        reply = ''.join(self.fragments)        self.fragments = []        u = unpack_reply(reply)        # print "Reply for xid %x" % u[0]        try:            (cb, proc) = self.xidcbmap[u[0]]            del self.xidcbmap[u[0]]        except KeyError:            sys.stderr.write("Reply for unknown xid %x received: %s" %                             (u[0], str(u[1:])))            return        if not cb:            return        # XXX should really return some useful info to cb if error case        #     either if denied, or if some weird bug like PROG_UNAVAIL.        if u[1] == RPCProto.MSG_ACCEPTED:            res = proc.unpack_res(u[-1])            cb(res)        else:            cb(None)    def collect_incoming_data(self, data):        if len(self.inbuffer) > 0:            data = self.inbuffer + data        (fraglen, lastfrag) = parse_frag_len(data)        # print "a", fraglen, lastfrag, len(data)        while 4 + fraglen <= len(data):            frag = data[4:4+fraglen]            self.fragments.append(frag)            if lastfrag:                self.handle_reply()            data = data[4+fraglen:]            if len(data) > 0:                (fraglen, lastfrag) = parse_frag_len(data)                # print "b", fraglen, lastfrag, len(data)            # else:                # print "c"        self.inbuffer = data                def found_terminator(self):        raise AssertionError, "We don't use terminators."    def __call__(self, pnum, arg, cb = None):        proc = self.module.programs[self.PROG][self.VERS][pnum]        xid = self.xidgen.next()        # print "Call for xid %x" % xid        p = pack_call(xid, self.PROG, self.VERS, pnum)        proc.pack_arg(p, arg)        request = p.get_buffer()        val = strbuf()        writefrags(request, val.write)        self.push(val.s)        self.xidcbmap[xid] = (cb, proc)        return None

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -