requestcorrelator.java

来自「JGRoups源码」· Java 代码 · 共 923 行 · 第 1/3 页

JAVA
923
字号
// $Id: RequestCorrelator.java,v 1.30 2006/08/29 07:27:14 belaban Exp $package org.jgroups.blocks;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Scheduler;import org.jgroups.util.SchedulerListener;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * Framework to send requests and receive matching responses (matching on * request ID). * Multiple requests can be sent at a time. Whenever a response is received, * the correct <code>RspCollector</code> is looked up (key = id) and its * method <code>receiveResponse()</code> invoked. A caller may use * <code>done()</code> to signal that no more responses are expected, and that * the corresponding entry may be removed. * <p> * <code>RequestCorrelator</code> can be installed at both client and server * sides, it can also switch roles dynamically; i.e., send a request and at * the same time process an incoming request (when local delivery is enabled, * this is actually the default). * <p> * * @author Bela Ban */public class RequestCorrelator {    /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */    protected Object transport=null;    /** The table of pending requests (keys=Long (request IDs), values=<tt>RequestEntry</tt>) */    protected final Map requests=new ConcurrentReaderHashMap();    /** The handler for the incoming requests. It is called from inside the dispatcher thread */    protected RequestHandler request_handler=null;    /** Possibility for an external marshaller to marshal/unmarshal responses */    protected RpcDispatcher.Marshaller marshaller=null;    /** makes the instance unique (together with IDs) */    protected String name=null;    /** The dispatching thread pool */    protected Scheduler scheduler=null;    /** The address of this group member */    protected Address local_addr=null;    /**     * This field is used only if deadlock detection is enabled.     * In case of nested synchronous requests, it holds a list of the     * addreses of the senders with the address at the bottom being the     * address of the first caller     */    protected java.util.Stack call_stack=null;    /** Whether or not to perform deadlock detection for synchronous (potentially recursive) group method invocations.     *  If on, we use a scheduler (handling a priority queue), otherwise we don't and call handleRequest() directly.     */    protected boolean deadlock_detection=false;    /**     * This field is used only if deadlock detection is enabled.     * It sets the calling stack to the currently running request     */    private CallStackSetter call_stack_setter=null;    /** Process items on the queue concurrently (Scheduler). The default is to wait until the processing of an item     * has completed before fetching the next item from the queue. Note that setting this to true     * may destroy the properties of a protocol stack, e.g total or causal order may not be     * guaranteed. Set this to true only if you know what you're doing ! */    protected boolean concurrent_processing=false;    protected boolean started=false;    protected static final Log log=LogFactory.getLog(RequestCorrelator.class);    /**     * Constructor. Uses transport to send messages. If <code>handler</code>     * is not null, all incoming requests will be dispatched to it (via     * <code>handle(Message)</code>).     *     * @param name Used to differentiate between different RequestCorrelators     * (e.g. in different protocol layers). Has to be unique if multiple     * request correlators are used.     *     * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be     *                  used then), or a Protocol (passUp()/passDown() will be used)     *     * @param handler Request handler. Method <code>handle(Message)</code>     * will be called when a request is received.     */    public RequestCorrelator(String name, Object transport, RequestHandler handler) {        this.name       = name;        this.transport  = transport;        request_handler = handler;        start();    }    public RequestCorrelator(String name, Object transport, RequestHandler handler, Address local_addr) {        this.name       = name;        this.transport  = transport;        this.local_addr=local_addr;        request_handler = handler;        start();    }    /**     * Constructor. Uses transport to send messages. If <code>handler</code>     * is not null, all incoming requests will be dispatched to it (via     * <code>handle(Message)</code>).     *     * @param name Used to differentiate between different RequestCorrelators     * (e.g. in different protocol layers). Has to be unique if multiple     * request correlators are used.     *     * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be     *                  used then), or a Protocol (passUp()/passDown() will be used)     *     * @param handler Request handler. Method <code>handle(Message)</code>     * will be called when a request is received.     *     * @param deadlock_detection When enabled (true) recursive synchronous     * message calls will be detected and processed with higher priority in     * order to solve deadlocks. Slows down processing a little bit when     * enabled due to runtime checks involved.     */    public RequestCorrelator(String name, Object transport,                             RequestHandler handler, boolean deadlock_detection) {        this.deadlock_detection = deadlock_detection;        this.name               = name;        this.transport          = transport;        request_handler         = handler;        start();    }    public RequestCorrelator(String name, Object transport,                             RequestHandler handler, boolean deadlock_detection, boolean concurrent_processing) {        this.deadlock_detection    = deadlock_detection;        this.name                  = name;        this.transport             = transport;        request_handler            = handler;        this.concurrent_processing = concurrent_processing;        start();    }    public RequestCorrelator(String name, Object transport,                             RequestHandler handler, boolean deadlock_detection, Address local_addr) {        this.deadlock_detection = deadlock_detection;        this.name               = name;        this.transport          = transport;        this.local_addr         = local_addr;        request_handler         = handler;        start();    }    public RequestCorrelator(String name, Object transport, RequestHandler handler,                             boolean deadlock_detection, Address local_addr, boolean concurrent_processing) {        this.deadlock_detection    = deadlock_detection;        this.name                  = name;        this.transport             = transport;        this.local_addr            = local_addr;        request_handler            = handler;        this.concurrent_processing = concurrent_processing;        start();    }    /**     * Switch the deadlock detection mechanism on/off     * @param flag the deadlock detection flag     */    public void setDeadlockDetection(boolean flag) {        if(deadlock_detection != flag) { // only set it if different            deadlock_detection=flag;            if(started) {                if(deadlock_detection) {                    startScheduler();                }                else {                    stopScheduler();                }            }        }    }    public void setRequestHandler(RequestHandler handler) {        request_handler=handler;        start();    }    public void setConcurrentProcessing(boolean concurrent_processing) {        this.concurrent_processing=concurrent_processing;    }    /**     * Helper method for {@link #sendRequest(long,List,Message,RspCollector)}.     */    public void sendRequest(long id, Message msg, RspCollector coll) throws Exception {        sendRequest(id, null, msg, coll);    }    public RpcDispatcher.Marshaller getMarshaller() {        return marshaller;    }    public void setMarshaller(RpcDispatcher.Marshaller marshaller) {        this.marshaller=marshaller;    }    /**     * Send a request to a group. If no response collector is given, no     * responses are expected (making the call asynchronous).     *     * @param id The request ID. Must be unique for this JVM (e.g. current     * time in millisecs)     * @param dest_mbrs The list of members who should receive the call. Usually a group RPC     *                  is sent via multicast, but a receiver drops the request if its own address     *                  is not in this list. Will not be used if it is null.     * @param msg The request to be sent. The body of the message carries     * the request data     *     * @param coll A response collector (usually the object that invokes     * this method). Its methods <code>receiveResponse()</code> and     * <code>suspect()</code> will be invoked when a message has been received     * or a member is suspected, respectively.     */    public void sendRequest(long id, List dest_mbrs, Message msg, RspCollector coll) throws Exception {        Header hdr;        if(transport == null) {            if(log.isWarnEnabled()) log.warn("transport is not available !");            return;        }        // i. Create the request correlator header and add it to the        // msg        // ii. If a reply is expected (sync call / 'coll != null'), add a        // coresponding entry in the pending requests table        // iii. If deadlock detection is enabled, set/update the call stack        // iv. Pass the msg down to the protocol layer below        hdr = new Header(Header.REQ, id, (coll != null), name);        hdr.dest_mbrs=dest_mbrs;        if (coll != null) {            if(deadlock_detection) {                if(local_addr == null) {                    if(log.isErrorEnabled()) log.error("local address is null !");                    return;                }                java.util.Stack new_call_stack = (call_stack != null?                                                  (java.util.Stack)call_stack.clone():new java.util.Stack());                new_call_stack.push(local_addr);                hdr.callStack=new_call_stack;            }            addEntry(hdr.id, new RequestEntry(coll));        }        msg.putHeader(name, hdr);        if(transport instanceof Protocol)            ((Protocol)transport).passDown(new Event(Event.MSG, msg));        else if(transport instanceof Transport)            ((Transport)transport).send(msg);        else            throw new IllegalStateException("transport has to be either a Transport or a Protocol, however it is a " + transport.getClass());    }    /**     * Used to signal that a certain request may be garbage collected as     * all responses have been received.     */    public void done(long id) {        removeEntry(id);    }    /**     * <b>Callback</b>.     * <p>

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?