📄 rpc.py
字号:
if msg.body.mtype == RPCProto.REPLY: raise UnpackException("Expected call, but got reply") call = msg.body.cbody check(RPCProto.RPC_VERSION, call.rpcvers, "RPC version", lambda: pack_reply(msg.xid, RPCProto.MSG_DENIED, RPCProto.RPC_MISMATCH, RPCProto.RPC_VERSION, RPCProto.RPC_VERSION).get_buffer()) check(myprog, call.prog, "program", lambda: pack_reply(msg.xid, RPCProto.MSG_ACCEPTED, NULL_AUTH, RPCProto.PROG_UNAVAIL).get_buffer()) check(myvers, call.vers, "version", lambda: pack_reply(msg.xid, RPCProto.MSG_ACCEPTED, NULL_AUTH, RPCProto.PROG_MISMATCH, myvers, myvers).get_buffer()) check(mycred, call.cred, "cred", lambda: pack_reply(msg.xid, RPCProto.MSG_DENIED, RPCProto.AUTH_ERROR, RPCProto.AUTH_BADCRED).get_buffer()) check(myverf, call.verf, "verf", lambda: pack_reply(msg.xid, RPCProto.MSG_DENIED, RPCProto.AUTH_ERROR, RPCProto.AUTH_BADVERF).get_buffer()) return (msg.xid, call.prog, call.vers, call.proc, call.cred, call.verf, u)class ReuseTCPServer(ThreadingTCPServer): def __init__(self, addr, handler): self.allow_reuse_address = 1 ThreadingTCPServer.__init__(self, addr, handler)class Server: def __init__(self, module, PROG, VERS, port, handlers, name=None): """If name is not None, Server prints debug messages prefixed by name.""" assert module is not None assert 'programs' in dir(module) assert PROG in module.programs assert VERS in module.programs[PROG] for proc in handlers: assert proc in module.programs[PROG][VERS] import SocketServer class StreamHandler(SocketServer.StreamRequestHandler): def dispatch(self, request): xid, prog, vers, proc, cred, verf, u = unpack_call( request, myprog=PROG, myvers=VERS) if proc in handlers: if name: print name + ": Got proc", proc arg = module.programs[PROG][VERS][proc].unpack_arg(u) res = handlers[proc](xid, cred, verf, arg) p = pack_reply(xid, MSG_ACCEPTED, NULL_AUTH, SUCCESS) module.programs[PROG][VERS][proc].pack_res(p, res) return p.get_buffer() else: # no such procedure if name: print name + ": Got unknown proc", proc return pack_reply(xid, MSG_ACCEPTED, NULL_AUTH, PROC_UNAVAIL).get_buffer() def handle(self): rfile = self.request.makefile('rb', -1) wfile = self.request.makefile('wb', -1) if name: print name + ": Got connection from", self.client_address while 1: try: request = readfrags(rfile.read) reply = self.dispatch(request) writefrags(reply, wfile.write) wfile.flush() except EOFError: return except UnpackException, e: if name: print name + ":", e return except ReplyException, e: if name: print name + ":", e writefrags(e.reply, wfile.write) wfile.flush() except: if name: print name + ": Unexpected error:" print_exc() return # avoid killing the server self.handler = StreamHandler self.port = port def run(self): server = ReuseTCPServer(('', self.port), self.handler) server.serve_forever() # support UDP? class XidGenerator: # FIXME: should this use locks? def __init__(self): import random, sys self.xid = random.randint(0, sys.maxint/2) # FIXME: should this randomize xids? def next(self): self.xid += 1 return self.xidclass ClientBase: def __init__(self, module, PROG, VERS, host, port): assert module is not None assert 'programs' in dir(module) assert PROG in module.programs assert VERS in module.programs[PROG] self.PROG = PROG self.VERS = VERS self.module = module self.xidgen = XidGenerator() self.host = host self.port = port def start_connect(self, host, port, cb = None): raise AssertionError, "This method must be overridden." def __call__(self, pnum, arg, cb = None): """Call proc number pnum with arg. If answer is immediately available, it will be returned. Otherwise, None is returned, and cb will be called later.""" raise AssertionError, "This method must be overridden."class SClient(ClientBase): def start_connect(self, cb = None): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.rfile = self.sock.makefile('rb', -1) self.wfile = self.sock.makefile('wb', -1) return self.sock # TODO: use exec to define methods on this object for each proc # in programs[PROG][VERS]. This requires that each generated # Procedure() object include the proc name as a string. # Probably a good idea, and for the Program and Version objects, # too. Also, rename all these to RPCProcedure, RPCProgram, # RPCVersion. def write_request(self, request): writefrags(request, self.wfile.write) self.wfile.flush() def read_reply(self): return readfrags(self.rfile.read) def __call__(self, pnum, arg, cb = None): proc = self.module.programs[self.PROG][self.VERS][pnum] p = pack_call(self.xidgen.next(), self.PROG, self.VERS, pnum) proc.pack_arg(p, arg) request = p.get_buffer() self.write_request(request) reply = self.read_reply() u = unpack_reply(reply)[-1] return proc.unpack_res(u)class strbuf: """Unlike stringio, always append to the end of string, read from front.""" def __init__(self): self.s = '' def write(self, s): self.s += s def read(self, n): # Slicing past the end of string returns '', working out well. v = self.s[:n] self.s = self.s[n:] return vclass AClient(ClientBase,asynchat.async_chat): def __init__(self, module, PROG, VERS, host, port): # A table of callbacks to be called, keyed by xid. self.xidcbmap = {} self.inbuffer = '' self.fragments = [] self.bytesleft = 0 # until end of current fragment. asynchat.async_chat.__init__(self) ClientBase.__init__(self, module, PROG, VERS, host, port) def handle_connect(self): err = self.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err == errno.ECONNREFUSED: self.connect_cb(None) else: self.connect_cb(self) def start_connect(self, cb = None): if cb is None: raise TypeError, "Must pass cb to async client" self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect_cb = cb self.connect((self.host, self.port)) self.set_terminator(None) return None def handle_reply(self): reply = ''.join(self.fragments) self.fragments = [] u = unpack_reply(reply) # print "Reply for xid %x" % u[0] try: (cb, proc) = self.xidcbmap[u[0]] del self.xidcbmap[u[0]] except KeyError: sys.stderr.write("Reply for unknown xid %x received: %s" % (u[0], str(u[1:]))) return if not cb: return # XXX should really return some useful info to cb if error case # either if denied, or if some weird bug like PROG_UNAVAIL. if u[1] == RPCProto.MSG_ACCEPTED: res = proc.unpack_res(u[-1]) cb(res) else: cb(None) def collect_incoming_data(self, data): if len(self.inbuffer) > 0: data = self.inbuffer + data (fraglen, lastfrag) = parse_frag_len(data) # print "a", fraglen, lastfrag, len(data) while 4 + fraglen <= len(data): frag = data[4:4+fraglen] self.fragments.append(frag) if lastfrag: self.handle_reply() data = data[4+fraglen:] if len(data) > 0: (fraglen, lastfrag) = parse_frag_len(data) # print "b", fraglen, lastfrag, len(data) # else: # print "c" self.inbuffer = data def found_terminator(self): raise AssertionError, "We don't use terminators." def __call__(self, pnum, arg, cb = None): proc = self.module.programs[self.PROG][self.VERS][pnum] xid = self.xidgen.next() # print "Call for xid %x" % xid p = pack_call(xid, self.PROG, self.VERS, pnum) proc.pack_arg(p, arg) request = p.get_buffer() val = strbuf() writefrags(request, val.write) self.push(val.s) self.xidcbmap[xid] = (cb, proc) return None
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -