messagedispatcher.java

来自「JGRoups源码」· Java 代码 · 共 945 行 · 第 1/3 页

JAVA
945
字号
package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.*;import java.io.InputStream;import java.io.OutputStream;import java.io.Serializable;import java.util.Vector;import java.util.Collection;import java.util.TreeSet;import java.util.ArrayList;/** * Provides synchronous and asynchronous message sending with request-response * correlation; i.e., matching responses with the original request. * It also offers push-style message reception (by internally using the PullPushAdapter). * <p> * Channels are simple patterns to asynchronously send a receive messages. * However, a significant number of communication patterns in group communication * require synchronous communication. For example, a sender would like to send a * message to the group and wait for all responses. Or another application would * like to send a message to the group and wait only until the majority of the * receivers have sent a response, or until a timeout occurred.  MessageDispatcher * offers a combination of the above pattern with other patterns. * <p> * Used on top of channel to implement group requests. Client's <code>handle()</code> * method is called when request is received. Is the equivalent of RpcProtocol on * the application instead of protocol level. * * @author Bela Ban * @version $Id: MessageDispatcher.java,v 1.60 2006/09/27 19:21:53 vlada Exp $ */public class MessageDispatcher implements RequestHandler {    protected Channel channel=null;    protected RequestCorrelator corr=null;    protected MessageListener msg_listener=null;    protected MembershipListener membership_listener=null;    protected RequestHandler req_handler=null;    protected ProtocolAdapter prot_adapter=null;    protected TransportAdapter transport_adapter=null;    protected final Collection members=new TreeSet();    protected Address local_addr=null;    protected boolean deadlock_detection=false;    protected PullPushAdapter adapter=null;    protected PullPushHandler handler=null;    protected Serializable id=null;    protected final Log log=LogFactory.getLog(getClass());    /**     * Process items on the queue concurrently (RequestCorrelator). The default is to wait until the processing of an     * item has completed before fetching the next item from the queue. Note that setting this to true may destroy the     * properties of a protocol stack, e.g total or causal order may not be guaranteed. Set this to true only if you     * know what you're doing !     */    protected boolean concurrent_processing=false;    public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2) {        this.channel=channel;        prot_adapter=new ProtocolAdapter();        if(channel != null) {            local_addr=channel.getLocalAddress();        }        setMessageListener(l);        setMembershipListener(l2);        if(channel != null) {            channel.setUpHandler(prot_adapter);        }        start();    }    public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, boolean deadlock_detection) {        this.channel=channel;        this.deadlock_detection=deadlock_detection;        prot_adapter=new ProtocolAdapter();        if(channel != null) {            local_addr=channel.getLocalAddress();        }        setMessageListener(l);        setMembershipListener(l2);        if(channel != null) {            channel.setUpHandler(prot_adapter);        }        start();    }    public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2,                             boolean deadlock_detection, boolean concurrent_processing) {        this.channel=channel;        this.deadlock_detection=deadlock_detection;        this.concurrent_processing=concurrent_processing;        prot_adapter=new ProtocolAdapter();        if(channel != null) {            local_addr=channel.getLocalAddress();        }        setMessageListener(l);        setMembershipListener(l2);        if(channel != null) {            channel.setUpHandler(prot_adapter);        }        start();    }    public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler) {        this(channel, l, l2);        setRequestHandler(req_handler);    }    public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler,                             boolean deadlock_detection) {        this(channel, l, l2, deadlock_detection, false);        setRequestHandler(req_handler);    }    public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler,                             boolean deadlock_detection, boolean concurrent_processing) {        this(channel, l, l2, deadlock_detection, concurrent_processing);        setRequestHandler(req_handler);    }    /*     * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be     * used to register under that id. This is typically used when another building block is already using     * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate     * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the     * first block created on PullPushAdapter.     * @param adapter The PullPushAdapter which to use as underlying transport     * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between     *           requests/responses for different building blocks on top of PullPushAdapter.     */    public MessageDispatcher(PullPushAdapter adapter, Serializable id,                             MessageListener l, MembershipListener l2) {        this.adapter=adapter;        this.id=id;        setMembers(((Channel) adapter.getTransport()).getView().getMembers());        setMessageListener(l);        setMembershipListener(l2);        handler=new PullPushHandler();        transport_adapter=new TransportAdapter();        adapter.addMembershipListener(handler); // remove in stop()        if(id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter            adapter.setListener(handler);        }        else {            adapter.registerListener(id, handler);        }        Transport tp;        if((tp=adapter.getTransport()) instanceof Channel) {            local_addr=((Channel) tp).getLocalAddress();        }        start();    }    /*     * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be     * used to register under that id. This is typically used when another building block is already using     * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate     * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the     * first block created on PullPushAdapter.     * @param adapter The PullPushAdapter which to use as underlying transport     * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between     *           requests/responses for different building blocks on top of PullPushAdapter.     * @param req_handler The object implementing RequestHandler. It will be called when a request is received     */    public MessageDispatcher(PullPushAdapter adapter, Serializable id,                             MessageListener l, MembershipListener l2,                             RequestHandler req_handler) {        this.adapter=adapter;        this.id=id;        setMembers(((Channel) adapter.getTransport()).getView().getMembers());        setRequestHandler(req_handler);        setMessageListener(l);        setMembershipListener(l2);        handler=new PullPushHandler();        transport_adapter=new TransportAdapter();        adapter.addMembershipListener(handler);        if(id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter            adapter.setListener(handler);        }        else {            adapter.registerListener(id, handler);        }        Transport tp;        if((tp=adapter.getTransport()) instanceof Channel) {            local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774        }        start();    }    public MessageDispatcher(PullPushAdapter adapter, Serializable id,                             MessageListener l, MembershipListener l2,                             RequestHandler req_handler, boolean concurrent_processing) {        this.concurrent_processing=concurrent_processing;        this.adapter=adapter;        this.id=id;        setMembers(((Channel) adapter.getTransport()).getView().getMembers());        setRequestHandler(req_handler);        setMessageListener(l);        setMembershipListener(l2);        handler=new PullPushHandler();        transport_adapter=new TransportAdapter();        adapter.addMembershipListener(handler);        if(id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter            adapter.setListener(handler);        }        else {            adapter.registerListener(id, handler);        }        Transport tp;        if((tp=adapter.getTransport()) instanceof Channel) {            local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774        }        start();    }    /** Returns a copy of members */    protected Collection getMembers() {        synchronized(members) {            return new ArrayList(members);        }    }    /**     * If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter     * initially since viewChange has most likely already been called in PullPushAdapter.     */    private void setMembers(Vector new_mbrs) {        if(new_mbrs != null) {            synchronized(members) {                members.clear();                members.addAll(new_mbrs);            }        }    }    public void setDeadlockDetection(boolean flag) {        deadlock_detection=flag;        if(corr != null)            corr.setDeadlockDetection(flag);    }    public void setConcurrentProcessing(boolean flag) {        this.concurrent_processing=flag;    }    public final void start() {        if(corr == null) {            if(transport_adapter != null) {                corr=new RequestCorrelator("MessageDispatcher", transport_adapter,                                           this, deadlock_detection, local_addr, concurrent_processing);            }            else {                corr=new RequestCorrelator("MessageDispatcher", prot_adapter,                                           this, deadlock_detection, local_addr, concurrent_processing);            }        }        correlatorStarted();        corr.start();        if(channel != null) {            Vector tmp_mbrs=channel.getView() != null ? channel.getView().getMembers() : null;            setMembers(tmp_mbrs);        }    }    protected void correlatorStarted() {        ;    }    public void stop() {        if(corr != null) {            corr.stop();        }        // fixes leaks of MembershipListeners (http://jira.jboss.com/jira/browse/JGRP-160)        if(adapter != null && handler != null) {            adapter.removeMembershipListener(handler);        }    }    public final void setMessageListener(MessageListener l) {        msg_listener=l;    }    /**     * Gives access to the currently configured MessageListener. Returns null if there is no     * configured MessageListener.     */    public MessageListener getMessageListener() {        return msg_listener;    }    public final void setMembershipListener(MembershipListener l) {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?