pullpushadapter.java
来自「JGRoups源码」· Java 代码 · 共 480 行 · 第 1/2 页
JAVA
480 行
// $Id: PullPushAdapter.java,v 1.22 2006/09/27 19:21:53 vlada Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.io.Serializable;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;/** * Allows a client of {@link org.jgroups.Channel} to be notified when messages have been received * instead of having to actively poll the channel for new messages. Typically used in the * client role (receive()). As this class does not implement interface * {@link org.jgroups.Transport}, but <b>uses</b> it for receiving messages, an underlying object * has to be used to send messages (e.g. the channel on which an object of this class relies).<p> * Multiple MembershipListeners can register with the PullPushAdapter; when a view is received, they * will all be notified. There is one main message listener which sends and receives message. In addition, * MessageListeners can register with a certain tag (identifier), and then send messages tagged with this * identifier. When a message with such an identifier is received, the corresponding MessageListener will be * looked up and the message dispatched to it. If no tag is found (default), the main MessageListener will * receive the message. * @author Bela Ban * @version $Revision */public class PullPushAdapter implements Runnable, ChannelListener { protected Transport transport=null; protected MessageListener listener=null; // main message receiver protected final List membership_listeners=new ArrayList(); protected Thread receiver_thread=null; protected final HashMap listeners=new HashMap(); // keys=identifier (Serializable), values=MessageListeners protected final Log log=LogFactory.getLog(getClass()); static final String PULL_HEADER="PULL_HEADER"; public PullPushAdapter(Transport transport) { this.transport=transport; start(); } public PullPushAdapter(Transport transport, MessageListener l) { this.transport=transport; setListener(l); start(); } public PullPushAdapter(Transport transport, MembershipListener ml) { this.transport=transport; addMembershipListener(ml); start(); } public PullPushAdapter(Transport transport, MessageListener l, MembershipListener ml) { this.transport=transport; setListener(l); addMembershipListener(ml); start(); } public PullPushAdapter(Transport transport, MessageListener l, MembershipListener ml, boolean start) { this.transport=transport; setListener(l); addMembershipListener(ml); if(start) start(); } public Transport getTransport() { return transport; } public final void start() { if(receiver_thread == null || !receiver_thread.isAlive()) { receiver_thread=new Thread(this, "PullPushAdapterThread"); receiver_thread.setDaemon(true); receiver_thread.start(); } if(transport instanceof JChannel) ((JChannel)transport).addChannelListener(this); } public void stop() { Thread tmp=null; if(receiver_thread != null && receiver_thread.isAlive()) { tmp=receiver_thread; receiver_thread=null; tmp.interrupt(); try { tmp.join(1000); } catch(Exception ex) { } } receiver_thread=null; } /** * Sends a message to the group - listeners to this identifier will receive the messages. * @param identifier the key that the proper listeners are listenting on * @param msg the Message to be sent * @see #registerListener */ public void send(Serializable identifier, Message msg) throws Exception { if(msg == null) { if(log.isErrorEnabled()) log.error("msg is null"); return; } if(identifier == null) transport.send(msg); else { msg.putHeader(PULL_HEADER, new PullHeader(identifier)); transport.send(msg); } } /** * Sends a message with no identifier; listener member will get this message on the other group members. * @param msg the Message to be sent * @throws Exception */ public void send(Message msg) throws Exception { send(null, msg); } public final void setListener(MessageListener l) { listener=l; } /** * Sets a listener to messages with a given identifier. * Messages sent with this identifier in their headers will be routed to this listener. * <b>Note: there can be only one listener for one identifier; * if you want to register a different listener to an already registered identifier, then unregister first.</b> * @param identifier - messages sent on the group with this object will be received by this listener * @param l - the listener that will get the message */ public void registerListener(Serializable identifier, MessageListener l) { if(l == null || identifier == null) { if(log.isErrorEnabled()) log.error("message listener or identifier is null"); return; } if(listeners.containsKey(identifier)) { if(log.isErrorEnabled()) log.error("listener with identifier=" + identifier + " already exists, choose a different identifier or unregister current listener"); // we do not want to overwrite the listener return; } listeners.put(identifier, l); } /** * Removes a message listener to a given identifier from the message listeners map. * @param identifier - the key to whom we do not want to listen any more */ public void unregisterListener(Serializable identifier) { listeners.remove(identifier); } /** @deprecated Use {@link #addMembershipListener} */ public void setMembershipListener(MembershipListener ml) { addMembershipListener(ml); } public final void addMembershipListener(MembershipListener l) { if(l != null && !membership_listeners.contains(l)) membership_listeners.add(l); } public void removeMembershipListener(MembershipListener l) { if(l != null && membership_listeners.contains(l)) membership_listeners.remove(l); } /** * Reentrant run(): message reception is serialized, then the listener is notified of the * message reception */ public void run() { Object obj; while(receiver_thread != null && Thread.currentThread().equals(receiver_thread)) { try { obj=transport.receive(0); if(obj == null) continue; if(obj instanceof Message) { handleMessage((Message)obj); } else if(obj instanceof GetStateEvent) { byte[] retval=null; GetStateEvent evt=(GetStateEvent)obj; String state_id=evt.getStateId(); if(listener != null) { try { if(listener instanceof ExtendedMessageListener && state_id!=null) { retval=((ExtendedMessageListener)listener).getState(state_id); } else { retval=listener.getState(); } } catch(Throwable t) { log.error("getState() from application failed, will return empty state", t); } } else { log.warn("no listener registered, returning empty state"); } if(transport instanceof Channel) { ((Channel)transport).returnState(retval, state_id); } else { if(log.isErrorEnabled()) log.error("underlying transport is not a Channel, but a " + transport.getClass().getName() + ": cannot return state using returnState()"); } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?