rpcdispatcher.java
来自「JGRoups源码」· Java 代码 · 共 391 行
JAVA
391 行
// $Id: RpcDispatcher.java,v 1.26 2006/08/29 08:11:35 belaban Exp $package org.jgroups.blocks;import org.jgroups.*;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.Serializable;import java.lang.reflect.Method;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Vector;/** * This class allows a programmer to invoke remote methods in all (or single) * group members and optionally wait for the return value(s). * An application will typically create a channel and layer the * RpcDispatcher building block on top of it, which allows it to * dispatch remote methods (client role) and at the same time be * called by other members (server role). * This class is derived from MessageDispatcher. * Is the equivalent of RpcProtocol on the application rather than protocol level. * @author Bela Ban */public class RpcDispatcher extends MessageDispatcher implements ChannelListener { protected Object server_obj=null; /** Marshaller to marshall requests at the caller and unmarshal requests at the receiver(s) */ protected Marshaller req_marshaller=null; /** Marshaller to marshal responses at the receiver(s) and unmarshal responses at the caller */ protected Marshaller rsp_marshaller=null; protected final List additionalChannelListeners=new ArrayList(); protected MethodLookup method_lookup=null; public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj) { super(channel, l, l2); channel.addChannelListener(this); this.server_obj=server_obj; } public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj, boolean deadlock_detection) { super(channel, l, l2, deadlock_detection); channel.addChannelListener(this); this.server_obj=server_obj; } public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj, boolean deadlock_detection, boolean concurrent_processing) { super(channel, l, l2, deadlock_detection, concurrent_processing); channel.addChannelListener(this); this.server_obj=server_obj; } public RpcDispatcher(PullPushAdapter adapter, Serializable id, MessageListener l, MembershipListener l2, Object server_obj) { super(adapter, id, l, l2); // Fixes bug #804956 // channel.setChannelListener(this); if(this.adapter != null) { Transport t=this.adapter.getTransport(); if(t != null && t instanceof Channel) { ((Channel)t).addChannelListener(this); } } this.server_obj=server_obj; } public interface Marshaller { byte[] objectToByteBuffer(Object obj) throws Exception; Object objectFromByteBuffer(byte[] buf) throws Exception; } public String getName() {return "RpcDispatcher";} public Marshaller getRequestMarshaller() {return req_marshaller;} public void setRequestMarshaller(Marshaller m) { this.req_marshaller=m; } public Marshaller getResponseMarshaller() {return rsp_marshaller;} public void setResponseMarshaller(Marshaller m) { this.rsp_marshaller=m; if(corr != null) corr.setMarshaller(m); } public Marshaller getMarshaller() {return req_marshaller;} public void setMarshaller(Marshaller m) {req_marshaller=m;} public Object getServerObject() {return server_obj;} public void setServerObject(Object server_obj) { this.server_obj=server_obj; } public MethodLookup getMethodLookup() { return method_lookup; } public void setMethodLookup(MethodLookup method_lookup) { this.method_lookup=method_lookup; } public RspList castMessage(Vector dests, Message msg, int mode, long timeout) { if(log.isErrorEnabled()) log.error("this method should not be used with " + "RpcDispatcher, but MessageDispatcher. Returning null"); return null; } public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException { if(log.isErrorEnabled()) log.error("this method should not be used with " + "RpcDispatcher, but MessageDispatcher. Returning null"); return null; } public RspList callRemoteMethods(Vector dests, String method_name, Object[] args, Class[] types, int mode, long timeout) { MethodCall method_call=new MethodCall(method_name, args, types); return callRemoteMethods(dests, method_call, mode, timeout); } public RspList callRemoteMethods(Vector dests, String method_name, Object[] args, String[] signature, int mode, long timeout) { MethodCall method_call=new MethodCall(method_name, args, signature); return callRemoteMethods(dests, method_call, mode, timeout); } public RspList callRemoteMethods(Vector dests, MethodCall method_call, int mode, long timeout) { if(dests != null && dests.size() == 0) { // don't send if dest list is empty if(log.isTraceEnabled()) log.trace(new StringBuffer("destination list of ").append(method_call.getName()). append("() is empty: no need to send message")); return new RspList(); } if(log.isTraceEnabled()) log.trace(new StringBuffer("dests=").append(dests).append(", method_call=").append(method_call). append(", mode=").append(mode).append(", timeout=").append(timeout)); byte[] buf; try { buf=req_marshaller != null? req_marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call); } catch(Exception e) { // if(log.isErrorEnabled()) log.error("exception", e); // we will change this in 2.4 to add the exception to the signature // (see http://jira.jboss.com/jira/browse/JGRP-193). The reason for a RTE is that we cannot change the // signature in 2.3, otherwise 2.3 would be *not* API compatible to prev releases throw new RuntimeException("failure to marshal argument(s)", e); } Message msg=new Message(null, null, buf); RspList retval=super.castMessage(dests, msg, mode, timeout); if(log.isTraceEnabled()) log.trace("responses: " + retval); return retval; } public Object callRemoteMethod(Address dest, String method_name, Object[] args, Class[] types, int mode, long timeout) throws Throwable { MethodCall method_call=new MethodCall(method_name, args, types); return callRemoteMethod(dest, method_call, mode, timeout); } public Object callRemoteMethod(Address dest, String method_name, Object[] args, String[] signature, int mode, long timeout) throws Throwable { MethodCall method_call=new MethodCall(method_name, args, signature); return callRemoteMethod(dest, method_call, mode, timeout); } public Object callRemoteMethod(Address dest, MethodCall method_call, int mode, long timeout) throws Throwable { byte[] buf=null; Message msg=null; Object retval=null; if(log.isTraceEnabled()) log.trace("dest=" + dest + ", method_call=" + method_call + ", mode=" + mode + ", timeout=" + timeout); buf=req_marshaller != null? req_marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call); msg=new Message(dest, null, buf); retval=super.sendMessage(msg, mode, timeout); if(log.isTraceEnabled()) log.trace("retval: " + retval); if(retval instanceof Throwable) throw (Throwable)retval; return retval; } protected void correlatorStarted() { if(corr != null) corr.setMarshaller(rsp_marshaller); } /** * Message contains MethodCall. Execute it against *this* object and return result. * Use MethodCall.invoke() to do this. Return result. */ public Object handle(Message req) { Object body=null; MethodCall method_call; if(server_obj == null) { if(log.isErrorEnabled()) log.error("no method handler is registered. Discarding request."); return null; } if(req == null || req.getLength() == 0) { if(log.isErrorEnabled()) log.error("message or message buffer is null"); return null; } try { body=req_marshaller != null? req_marshaller.objectFromByteBuffer(req.getBuffer()) : req.getObject(); } catch(Throwable e) { if(log.isErrorEnabled()) log.error("exception marshalling object", e); return e; } if(body == null || !(body instanceof MethodCall)) { if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); return null; } method_call=(MethodCall)body; try { if(log.isTraceEnabled()) log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); if(method_call.getMode() == MethodCall.ID) { if(method_lookup == null) throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); Method m=method_lookup.findMethod(method_call.getId()); if(m == null) throw new Exception("no method foudn for " + method_call.getId()); method_call.setMethod(m); } return method_call.invoke(server_obj); } catch(Throwable x) { return x; } } /** * Add a new channel listener to be notified on the channel's state change. * * @return true if the listener was added or false if the listener was already in the list. */ public boolean addChannelListener(ChannelListener l) { synchronized(additionalChannelListeners) { if (additionalChannelListeners.contains(l)) { return false; } additionalChannelListeners.add(l); return true; } } /** * * @return true if the channel was removed indeed. */ public boolean removeChannelListener(ChannelListener l) { synchronized(additionalChannelListeners) { return additionalChannelListeners.remove(l); } } /* --------------------- Interface ChannelListener ---------------------- */ public void channelConnected(Channel channel) { synchronized(additionalChannelListeners) { for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { ChannelListener l = (ChannelListener)i.next(); try { l.channelConnected(channel); } catch(Throwable t) { log.warn("channel listener failed", t); } } } } public void channelDisconnected(Channel channel) { stop(); synchronized(additionalChannelListeners) { for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { ChannelListener l = (ChannelListener)i.next(); try { l.channelDisconnected(channel); } catch(Throwable t) { log.warn("channel listener failed", t); } } } } public void channelClosed(Channel channel) { stop(); synchronized(additionalChannelListeners) { for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { ChannelListener l = (ChannelListener)i.next(); try { l.channelClosed(channel); } catch(Throwable t) { log.warn("channel listener failed", t); } } } } public void channelShunned() { synchronized(additionalChannelListeners) { for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { ChannelListener l = (ChannelListener)i.next(); try { l.channelShunned(); } catch(Throwable t) { log.warn("channel listener failed", t); } } } } public void channelReconnected(Address new_addr) { if(log.isTraceEnabled()) log.trace("channel has been rejoined, old local_addr=" + local_addr + ", new local_addr=" + new_addr); this.local_addr=new_addr; start(); synchronized(additionalChannelListeners) { for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { ChannelListener l = (ChannelListener)i.next(); try { l.channelReconnected(new_addr); } catch(Throwable t) { log.warn("channel listener failed", t); } } } } /* ----------------------------------------------------------------------- */}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?