requestcorrelator.java
来自「JGRoups源码」· Java 代码 · 共 923 行 · 第 1/3 页
JAVA
923 行
// $Id: RequestCorrelator.java,v 1.30 2006/08/29 07:27:14 belaban Exp $package org.jgroups.blocks;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Scheduler;import org.jgroups.util.SchedulerListener;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * Framework to send requests and receive matching responses (matching on * request ID). * Multiple requests can be sent at a time. Whenever a response is received, * the correct <code>RspCollector</code> is looked up (key = id) and its * method <code>receiveResponse()</code> invoked. A caller may use * <code>done()</code> to signal that no more responses are expected, and that * the corresponding entry may be removed. * <p> * <code>RequestCorrelator</code> can be installed at both client and server * sides, it can also switch roles dynamically; i.e., send a request and at * the same time process an incoming request (when local delivery is enabled, * this is actually the default). * <p> * * @author Bela Ban */public class RequestCorrelator { /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */ protected Object transport=null; /** The table of pending requests (keys=Long (request IDs), values=<tt>RequestEntry</tt>) */ protected final Map requests=new ConcurrentReaderHashMap(); /** The handler for the incoming requests. It is called from inside the dispatcher thread */ protected RequestHandler request_handler=null; /** Possibility for an external marshaller to marshal/unmarshal responses */ protected RpcDispatcher.Marshaller marshaller=null; /** makes the instance unique (together with IDs) */ protected String name=null; /** The dispatching thread pool */ protected Scheduler scheduler=null; /** The address of this group member */ protected Address local_addr=null; /** * This field is used only if deadlock detection is enabled. * In case of nested synchronous requests, it holds a list of the * addreses of the senders with the address at the bottom being the * address of the first caller */ protected java.util.Stack call_stack=null; /** Whether or not to perform deadlock detection for synchronous (potentially recursive) group method invocations. * If on, we use a scheduler (handling a priority queue), otherwise we don't and call handleRequest() directly. */ protected boolean deadlock_detection=false; /** * This field is used only if deadlock detection is enabled. * It sets the calling stack to the currently running request */ private CallStackSetter call_stack_setter=null; /** Process items on the queue concurrently (Scheduler). The default is to wait until the processing of an item * has completed before fetching the next item from the queue. Note that setting this to true * may destroy the properties of a protocol stack, e.g total or causal order may not be * guaranteed. Set this to true only if you know what you're doing ! */ protected boolean concurrent_processing=false; protected boolean started=false; protected static final Log log=LogFactory.getLog(RequestCorrelator.class); /** * Constructor. Uses transport to send messages. If <code>handler</code> * is not null, all incoming requests will be dispatched to it (via * <code>handle(Message)</code>). * * @param name Used to differentiate between different RequestCorrelators * (e.g. in different protocol layers). Has to be unique if multiple * request correlators are used. * * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be * used then), or a Protocol (passUp()/passDown() will be used) * * @param handler Request handler. Method <code>handle(Message)</code> * will be called when a request is received. */ public RequestCorrelator(String name, Object transport, RequestHandler handler) { this.name = name; this.transport = transport; request_handler = handler; start(); } public RequestCorrelator(String name, Object transport, RequestHandler handler, Address local_addr) { this.name = name; this.transport = transport; this.local_addr=local_addr; request_handler = handler; start(); } /** * Constructor. Uses transport to send messages. If <code>handler</code> * is not null, all incoming requests will be dispatched to it (via * <code>handle(Message)</code>). * * @param name Used to differentiate between different RequestCorrelators * (e.g. in different protocol layers). Has to be unique if multiple * request correlators are used. * * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be * used then), or a Protocol (passUp()/passDown() will be used) * * @param handler Request handler. Method <code>handle(Message)</code> * will be called when a request is received. * * @param deadlock_detection When enabled (true) recursive synchronous * message calls will be detected and processed with higher priority in * order to solve deadlocks. Slows down processing a little bit when * enabled due to runtime checks involved. */ public RequestCorrelator(String name, Object transport, RequestHandler handler, boolean deadlock_detection) { this.deadlock_detection = deadlock_detection; this.name = name; this.transport = transport; request_handler = handler; start(); } public RequestCorrelator(String name, Object transport, RequestHandler handler, boolean deadlock_detection, boolean concurrent_processing) { this.deadlock_detection = deadlock_detection; this.name = name; this.transport = transport; request_handler = handler; this.concurrent_processing = concurrent_processing; start(); } public RequestCorrelator(String name, Object transport, RequestHandler handler, boolean deadlock_detection, Address local_addr) { this.deadlock_detection = deadlock_detection; this.name = name; this.transport = transport; this.local_addr = local_addr; request_handler = handler; start(); } public RequestCorrelator(String name, Object transport, RequestHandler handler, boolean deadlock_detection, Address local_addr, boolean concurrent_processing) { this.deadlock_detection = deadlock_detection; this.name = name; this.transport = transport; this.local_addr = local_addr; request_handler = handler; this.concurrent_processing = concurrent_processing; start(); } /** * Switch the deadlock detection mechanism on/off * @param flag the deadlock detection flag */ public void setDeadlockDetection(boolean flag) { if(deadlock_detection != flag) { // only set it if different deadlock_detection=flag; if(started) { if(deadlock_detection) { startScheduler(); } else { stopScheduler(); } } } } public void setRequestHandler(RequestHandler handler) { request_handler=handler; start(); } public void setConcurrentProcessing(boolean concurrent_processing) { this.concurrent_processing=concurrent_processing; } /** * Helper method for {@link #sendRequest(long,List,Message,RspCollector)}. */ public void sendRequest(long id, Message msg, RspCollector coll) throws Exception { sendRequest(id, null, msg, coll); } public RpcDispatcher.Marshaller getMarshaller() { return marshaller; } public void setMarshaller(RpcDispatcher.Marshaller marshaller) { this.marshaller=marshaller; } /** * Send a request to a group. If no response collector is given, no * responses are expected (making the call asynchronous). * * @param id The request ID. Must be unique for this JVM (e.g. current * time in millisecs) * @param dest_mbrs The list of members who should receive the call. Usually a group RPC * is sent via multicast, but a receiver drops the request if its own address * is not in this list. Will not be used if it is null. * @param msg The request to be sent. The body of the message carries * the request data * * @param coll A response collector (usually the object that invokes * this method). Its methods <code>receiveResponse()</code> and * <code>suspect()</code> will be invoked when a message has been received * or a member is suspected, respectively. */ public void sendRequest(long id, List dest_mbrs, Message msg, RspCollector coll) throws Exception { Header hdr; if(transport == null) { if(log.isWarnEnabled()) log.warn("transport is not available !"); return; } // i. Create the request correlator header and add it to the // msg // ii. If a reply is expected (sync call / 'coll != null'), add a // coresponding entry in the pending requests table // iii. If deadlock detection is enabled, set/update the call stack // iv. Pass the msg down to the protocol layer below hdr = new Header(Header.REQ, id, (coll != null), name); hdr.dest_mbrs=dest_mbrs; if (coll != null) { if(deadlock_detection) { if(local_addr == null) { if(log.isErrorEnabled()) log.error("local address is null !"); return; } java.util.Stack new_call_stack = (call_stack != null? (java.util.Stack)call_stack.clone():new java.util.Stack()); new_call_stack.push(local_addr); hdr.callStack=new_call_stack; } addEntry(hdr.id, new RequestEntry(coll)); } msg.putHeader(name, hdr); if(transport instanceof Protocol) ((Protocol)transport).passDown(new Event(Event.MSG, msg)); else if(transport instanceof Transport) ((Transport)transport).send(msg); else throw new IllegalStateException("transport has to be either a Transport or a Protocol, however it is a " + transport.getClass()); } /** * Used to signal that a certain request may be garbage collected as * all responses have been received. */ public void done(long id) { removeEntry(id); } /** * <b>Callback</b>. * <p>
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?