pullpushadapter.java

来自「JGRoups源码」· Java 代码 · 共 480 行 · 第 1/2 页

JAVA
480
字号
// $Id: PullPushAdapter.java,v 1.22 2006/09/27 19:21:53 vlada Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.io.Serializable;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;/** * Allows a client of {@link org.jgroups.Channel} to be notified when messages have been received * instead of having to actively poll the channel for new messages. Typically used in the * client role (receive()). As this class does not implement interface * {@link org.jgroups.Transport}, but <b>uses</b> it for receiving messages, an underlying object * has to be used to send messages (e.g. the channel on which an object of this class relies).<p> * Multiple MembershipListeners can register with the PullPushAdapter; when a view is received, they * will all be notified. There is one main message listener which sends and receives message. In addition, * MessageListeners can register with a certain tag (identifier), and then send messages tagged with this * identifier. When a message with such an identifier is received, the corresponding MessageListener will be * looked up and the message dispatched to it. If no tag is found (default), the main MessageListener will * receive the message. * @author Bela Ban * @version $Revision */public class PullPushAdapter implements Runnable, ChannelListener {    protected Transport       transport=null;    protected MessageListener listener=null;           // main message receiver    protected final List      membership_listeners=new ArrayList();    protected Thread          receiver_thread=null;    protected final HashMap   listeners=new HashMap(); // keys=identifier (Serializable), values=MessageListeners    protected final Log       log=LogFactory.getLog(getClass());    static final String       PULL_HEADER="PULL_HEADER";    public PullPushAdapter(Transport transport) {        this.transport=transport;        start();    }    public PullPushAdapter(Transport transport, MessageListener l) {        this.transport=transport;        setListener(l);        start();    }    public PullPushAdapter(Transport transport, MembershipListener ml) {        this.transport=transport;        addMembershipListener(ml);        start();    }    public PullPushAdapter(Transport transport, MessageListener l, MembershipListener ml) {        this.transport=transport;        setListener(l);        addMembershipListener(ml);        start();    }    public PullPushAdapter(Transport transport, MessageListener l, MembershipListener ml, boolean start) {        this.transport=transport;        setListener(l);        addMembershipListener(ml);        if(start)            start();    }    public Transport getTransport() {        return transport;    }    public final void start() {        if(receiver_thread == null || !receiver_thread.isAlive()) {            receiver_thread=new Thread(this, "PullPushAdapterThread");            receiver_thread.setDaemon(true);            receiver_thread.start();        }        if(transport instanceof JChannel)            ((JChannel)transport).addChannelListener(this);    }    public void stop() {        Thread tmp=null;        if(receiver_thread != null && receiver_thread.isAlive()) {            tmp=receiver_thread;            receiver_thread=null;            tmp.interrupt();            try {                tmp.join(1000);            }            catch(Exception ex) {            }        }        receiver_thread=null;    }    /**     * Sends a message to the group - listeners to this identifier will receive the messages.     * @param identifier the key that the proper listeners are listenting on      * @param msg the Message to be sent     * @see #registerListener     */    public void send(Serializable identifier, Message msg) throws Exception {        if(msg == null) {            if(log.isErrorEnabled()) log.error("msg is null");            return;        }        if(identifier == null)            transport.send(msg);        else {            msg.putHeader(PULL_HEADER, new PullHeader(identifier));            transport.send(msg);        }    }    /**     * Sends a message with no identifier; listener member will get this message on the other group members.     * @param msg the Message to be sent     * @throws Exception     */    public void send(Message msg) throws Exception {        send(null, msg);    }    public final void setListener(MessageListener l) {        listener=l;    }        /**     * Sets a listener to messages with a given identifier.     * Messages sent with this identifier in their headers will be routed to this listener.     * <b>Note: there can be only one listener for one identifier;     * if you want to register a different listener to an already registered identifier, then unregister first.</b>      * @param identifier - messages sent on the group with this object will be received by this listener      * @param l - the listener that will get the message     */    public void registerListener(Serializable identifier, MessageListener l) {        if(l == null || identifier == null) {            if(log.isErrorEnabled()) log.error("message listener or identifier is null");            return;        }        if(listeners.containsKey(identifier)) {            if(log.isErrorEnabled()) log.error("listener with identifier=" + identifier +                    " already exists, choose a different identifier or unregister current listener");            // we do not want to overwrite the listener            return;        }        listeners.put(identifier, l);    }        /**     * Removes a message listener to a given identifier from the message listeners map.     * @param identifier - the key to whom we do not want to listen any more     */    public void unregisterListener(Serializable identifier) {    	listeners.remove(identifier);    }    /** @deprecated Use {@link #addMembershipListener} */    public void setMembershipListener(MembershipListener ml) {        addMembershipListener(ml);    }    public final void addMembershipListener(MembershipListener l) {        if(l != null && !membership_listeners.contains(l))            membership_listeners.add(l);    }    public void removeMembershipListener(MembershipListener l) {        if(l != null && membership_listeners.contains(l))            membership_listeners.remove(l);    }    /**     * Reentrant run(): message reception is serialized, then the listener is notified of the     * message reception     */    public void run() {        Object obj;        while(receiver_thread != null && Thread.currentThread().equals(receiver_thread)) {            try {                obj=transport.receive(0);                if(obj == null)                    continue;                if(obj instanceof Message) {                    handleMessage((Message)obj);                }                else if(obj instanceof GetStateEvent) {                    byte[] retval=null;                    GetStateEvent evt=(GetStateEvent)obj;                    String state_id=evt.getStateId();                    if(listener != null) {                        try {                            if(listener instanceof ExtendedMessageListener && state_id!=null) {                                retval=((ExtendedMessageListener)listener).getState(state_id);                            }                            else {                                retval=listener.getState();                            }                        }                        catch(Throwable t) {                            log.error("getState() from application failed, will return empty state", t);                        }                    }                    else {                        log.warn("no listener registered, returning empty state");                    }                    if(transport instanceof Channel) {                        ((Channel)transport).returnState(retval, state_id);                    }                    else {                        if(log.isErrorEnabled())                            log.error("underlying transport is not a Channel, but a " +                                    transport.getClass().getName() + ": cannot return state using returnState()");                    }                }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?