messagedispatcher.java
来自「JGRoups源码」· Java 代码 · 共 945 行 · 第 1/3 页
JAVA
945 行
package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.*;import java.io.InputStream;import java.io.OutputStream;import java.io.Serializable;import java.util.Vector;import java.util.Collection;import java.util.TreeSet;import java.util.ArrayList;/** * Provides synchronous and asynchronous message sending with request-response * correlation; i.e., matching responses with the original request. * It also offers push-style message reception (by internally using the PullPushAdapter). * <p> * Channels are simple patterns to asynchronously send a receive messages. * However, a significant number of communication patterns in group communication * require synchronous communication. For example, a sender would like to send a * message to the group and wait for all responses. Or another application would * like to send a message to the group and wait only until the majority of the * receivers have sent a response, or until a timeout occurred. MessageDispatcher * offers a combination of the above pattern with other patterns. * <p> * Used on top of channel to implement group requests. Client's <code>handle()</code> * method is called when request is received. Is the equivalent of RpcProtocol on * the application instead of protocol level. * * @author Bela Ban * @version $Id: MessageDispatcher.java,v 1.60 2006/09/27 19:21:53 vlada Exp $ */public class MessageDispatcher implements RequestHandler { protected Channel channel=null; protected RequestCorrelator corr=null; protected MessageListener msg_listener=null; protected MembershipListener membership_listener=null; protected RequestHandler req_handler=null; protected ProtocolAdapter prot_adapter=null; protected TransportAdapter transport_adapter=null; protected final Collection members=new TreeSet(); protected Address local_addr=null; protected boolean deadlock_detection=false; protected PullPushAdapter adapter=null; protected PullPushHandler handler=null; protected Serializable id=null; protected final Log log=LogFactory.getLog(getClass()); /** * Process items on the queue concurrently (RequestCorrelator). The default is to wait until the processing of an * item has completed before fetching the next item from the queue. Note that setting this to true may destroy the * properties of a protocol stack, e.g total or causal order may not be guaranteed. Set this to true only if you * know what you're doing ! */ protected boolean concurrent_processing=false; public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2) { this.channel=channel; prot_adapter=new ProtocolAdapter(); if(channel != null) { local_addr=channel.getLocalAddress(); } setMessageListener(l); setMembershipListener(l2); if(channel != null) { channel.setUpHandler(prot_adapter); } start(); } public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, boolean deadlock_detection) { this.channel=channel; this.deadlock_detection=deadlock_detection; prot_adapter=new ProtocolAdapter(); if(channel != null) { local_addr=channel.getLocalAddress(); } setMessageListener(l); setMembershipListener(l2); if(channel != null) { channel.setUpHandler(prot_adapter); } start(); } public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, boolean deadlock_detection, boolean concurrent_processing) { this.channel=channel; this.deadlock_detection=deadlock_detection; this.concurrent_processing=concurrent_processing; prot_adapter=new ProtocolAdapter(); if(channel != null) { local_addr=channel.getLocalAddress(); } setMessageListener(l); setMembershipListener(l2); if(channel != null) { channel.setUpHandler(prot_adapter); } start(); } public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler) { this(channel, l, l2); setRequestHandler(req_handler); } public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler, boolean deadlock_detection) { this(channel, l, l2, deadlock_detection, false); setRequestHandler(req_handler); } public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler, boolean deadlock_detection, boolean concurrent_processing) { this(channel, l, l2, deadlock_detection, concurrent_processing); setRequestHandler(req_handler); } /* * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be * used to register under that id. This is typically used when another building block is already using * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the * first block created on PullPushAdapter. * @param adapter The PullPushAdapter which to use as underlying transport * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between * requests/responses for different building blocks on top of PullPushAdapter. */ public MessageDispatcher(PullPushAdapter adapter, Serializable id, MessageListener l, MembershipListener l2) { this.adapter=adapter; this.id=id; setMembers(((Channel) adapter.getTransport()).getView().getMembers()); setMessageListener(l); setMembershipListener(l2); handler=new PullPushHandler(); transport_adapter=new TransportAdapter(); adapter.addMembershipListener(handler); // remove in stop() if(id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter adapter.setListener(handler); } else { adapter.registerListener(id, handler); } Transport tp; if((tp=adapter.getTransport()) instanceof Channel) { local_addr=((Channel) tp).getLocalAddress(); } start(); } /* * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be * used to register under that id. This is typically used when another building block is already using * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the * first block created on PullPushAdapter. * @param adapter The PullPushAdapter which to use as underlying transport * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between * requests/responses for different building blocks on top of PullPushAdapter. * @param req_handler The object implementing RequestHandler. It will be called when a request is received */ public MessageDispatcher(PullPushAdapter adapter, Serializable id, MessageListener l, MembershipListener l2, RequestHandler req_handler) { this.adapter=adapter; this.id=id; setMembers(((Channel) adapter.getTransport()).getView().getMembers()); setRequestHandler(req_handler); setMessageListener(l); setMembershipListener(l2); handler=new PullPushHandler(); transport_adapter=new TransportAdapter(); adapter.addMembershipListener(handler); if(id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter adapter.setListener(handler); } else { adapter.registerListener(id, handler); } Transport tp; if((tp=adapter.getTransport()) instanceof Channel) { local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774 } start(); } public MessageDispatcher(PullPushAdapter adapter, Serializable id, MessageListener l, MembershipListener l2, RequestHandler req_handler, boolean concurrent_processing) { this.concurrent_processing=concurrent_processing; this.adapter=adapter; this.id=id; setMembers(((Channel) adapter.getTransport()).getView().getMembers()); setRequestHandler(req_handler); setMessageListener(l); setMembershipListener(l2); handler=new PullPushHandler(); transport_adapter=new TransportAdapter(); adapter.addMembershipListener(handler); if(id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter adapter.setListener(handler); } else { adapter.registerListener(id, handler); } Transport tp; if((tp=adapter.getTransport()) instanceof Channel) { local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774 } start(); } /** Returns a copy of members */ protected Collection getMembers() { synchronized(members) { return new ArrayList(members); } } /** * If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter * initially since viewChange has most likely already been called in PullPushAdapter. */ private void setMembers(Vector new_mbrs) { if(new_mbrs != null) { synchronized(members) { members.clear(); members.addAll(new_mbrs); } } } public void setDeadlockDetection(boolean flag) { deadlock_detection=flag; if(corr != null) corr.setDeadlockDetection(flag); } public void setConcurrentProcessing(boolean flag) { this.concurrent_processing=flag; } public final void start() { if(corr == null) { if(transport_adapter != null) { corr=new RequestCorrelator("MessageDispatcher", transport_adapter, this, deadlock_detection, local_addr, concurrent_processing); } else { corr=new RequestCorrelator("MessageDispatcher", prot_adapter, this, deadlock_detection, local_addr, concurrent_processing); } } correlatorStarted(); corr.start(); if(channel != null) { Vector tmp_mbrs=channel.getView() != null ? channel.getView().getMembers() : null; setMembers(tmp_mbrs); } } protected void correlatorStarted() { ; } public void stop() { if(corr != null) { corr.stop(); } // fixes leaks of MembershipListeners (http://jira.jboss.com/jira/browse/JGRP-160) if(adapter != null && handler != null) { adapter.removeMembershipListener(handler); } } public final void setMessageListener(MessageListener l) { msg_listener=l; } /** * Gives access to the currently configured MessageListener. Returns null if there is no * configured MessageListener. */ public MessageListener getMessageListener() { return msg_listener; } public final void setMembershipListener(MembershipListener l) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?