distributedqueue.java

来自「JGRoups源码」· Java 代码 · 共 733 行 · 第 1/2 页

JAVA
733
字号
// $Id: DistributedQueue.java,v 1.19 2006/07/31 09:21:58 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.Serializable;import java.util.*;/** * Provides the abstraction of a java.util.LinkedList that is replicated at several * locations. Any change to the list (reset, add, remove, etc.) will transparently be * propagated to all replicas in the group. All read-only methods will always access the * local replica.<p> * Both keys and values added to the list <em>must be serializable</em>, the reason * being that they will be sent across the network to all replicas of the group. * An instance of this class will contact an existing member of the group to fetch its * initial state. * Beware to use a <em>total protocol</em> on initialization or elements would not be in same * order on all replicas. * @author Romuald du Song */public class DistributedQueue implements MessageListener, MembershipListener, Cloneable{    public interface Notification    {        void entryAdd(Object value);        void entryRemoved(Object key);        void viewChange(Vector new_mbrs, Vector old_mbrs);        void contentsCleared();        void contentsSet(Collection new_entries);    }    protected Log logger = LogFactory.getLog(getClass());    private long internal_timeout = 10000; // 10 seconds to wait for a response    /*lock object for synchronization*/    protected final Object mutex = new Object();    protected boolean stopped = false; // whether to we are stopped !    protected LinkedList internalQueue;    protected Channel channel;    protected RpcDispatcher disp = null;    protected String groupname = null;    protected Vector notifs = new Vector(); // to be notified when mbrship changes    protected Vector members = new Vector(); // keeps track of all DHTs    private Class[] add_signature = null;    private Class[] addAtHead_signature = null;    private Class[] addAll_signature = null;    private Class[] reset_signature = null;    private Class[] remove_signature = null;        /**     * Creates a DistributedQueue     * @param groupname The name of the group to join     * @param factory The ChannelFactory which will be used to create a channel     * @param properties The property string to be used to define the channel     * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.     */    public DistributedQueue(String groupname, ChannelFactory factory, String properties, long state_timeout)                     throws ChannelException    {        if (logger.isDebugEnabled())        {            logger.debug("DistributedQueue(" + groupname + ',' + properties + ',' + state_timeout);        }        this.groupname = groupname;        initSignatures();        internalQueue = new LinkedList();        channel = (factory != null) ? factory.createChannel(properties) : new JChannel(properties);        disp = new RpcDispatcher(channel, this, this, this);        disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall        channel.connect(groupname);        start(state_timeout);    }    public DistributedQueue(JChannel channel)    {        this.groupname = channel.getClusterName();        this.channel = channel;        init();    }    /**      * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be      * used to register under that id. This is typically used when another building block is already using      * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate      * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the      * first block created on PullPushAdapter.      * The caller needs to call start(), before using the this block. It gives the opportunity for the caller      * to register as a lessoner for Notifications events.      * @param adapter The PullPushAdapter which to use as underlying transport      * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between      *           requests/responses for different building blocks on top of PullPushAdapter.      */    public DistributedQueue(PullPushAdapter adapter, Serializable id)    {        this.channel = (Channel)adapter.getTransport();        this.groupname = this.channel.getClusterName();        initSignatures();        internalQueue = new LinkedList();        disp = new RpcDispatcher(adapter, id, this, this, this);        disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall    }    protected final void init()    {        initSignatures();        internalQueue = new LinkedList();        disp = new RpcDispatcher(channel, this, this, this);        disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall    }    public final void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException    {        boolean rc;        logger.debug("DistributedQueue.initState(" + groupname + "): starting state retrieval");        rc = channel.getState(null, state_timeout);        if (rc)        {            logger.info("DistributedQueue.initState(" + groupname + "): state was retrieved successfully");        }        else        {            logger.info("DistributedQueue.initState(" + groupname + "): state could not be retrieved (first member)");        }    }    public Address getLocalAddress()    {        return (channel != null) ? channel.getLocalAddress() : null;    }    public Channel getChannel()    {        return channel;    }    public void addNotifier(Notification n)    {        if (n != null && !notifs.contains(n))        {            notifs.addElement(n);        }    }    public void removeNotifier(Notification n)    {        notifs.removeElement(n);    }    public void stop()    {        /*lock the queue from other threads*/        synchronized (mutex)        {            internalQueue.clear();            if (disp != null)            {                disp.stop();                disp = null;            }            if (channel != null)            {                channel.close();                channel = null;            }            stopped = true;        }    }    /**     * Add the speficied element at the bottom of the queue     * @param value     */    public void add(Object value)    {        try        {            Object retval = null;            RspList rsp = disp.callRemoteMethods(null, "_add", new Object[]{value}, add_signature, GroupRequest.GET_ALL, 0);            Vector results = rsp.getResults();            if (results.size() > 0)            {                retval = results.elementAt(0);                if (logger.isDebugEnabled())                {                    checkResult(rsp, retval);                }            }        }         catch (Exception e)        {            logger.error("Unable to add value " + value, e);        }    }    /**     * Add the speficied element at the top of the queue     * @param value     */    public void addAtHead(Object value)    {        try        {            disp.callRemoteMethods(null, "_addAtHead", new Object[]{value}, addAtHead_signature, GroupRequest.GET_ALL, 0);        }         catch (Exception e)        {            logger.error("Unable to addAtHead value " + value, e);        }    }    /**     * Add the speficied collection to the top of the queue.     * Elements are added in the order that they are returned by the specified     * collection's iterator.     * @param values     */    public void addAll(Collection values)    {        try        {            disp.callRemoteMethods(null, "_addAll", new Object[]{values}, addAll_signature, GroupRequest.GET_ALL, 0);        }         catch (Exception e)        {            logger.error("Unable to addAll value: " + values, e);        }    }    public Vector getContents()    {        Vector result = new Vector();        for (Iterator e = internalQueue.iterator(); e.hasNext();)            result.add(e.next());        return result;    }    public int size()    {        return internalQueue.size();    }    /**      * returns the first object on the queue, without removing it.      * If the queue is empty this object blocks until the first queue object has      * been added      * @return the first object on the queue      */    public Object peek()    {        Object retval = null;        try        {            retval = internalQueue.getFirst();        }         catch (NoSuchElementException e)        {        }        return retval;    }    public void reset()    {        try        {            disp.callRemoteMethods(null, "_reset", null, reset_signature, GroupRequest.GET_ALL, 0);        }         catch (Exception e)        {            logger.error("DistributedQueue.reset(" + groupname + ')', e);        }    }    protected void checkResult(RspList rsp, Object retval)    {        if (logger.isDebugEnabled())        {            logger.debug("Value updated from " + groupname + " :" + retval);        }        Vector results = rsp.getResults();        for (int i = 0; i < results.size(); i++)        {            Object data = results.elementAt(i);            if (!data.equals(retval))            {                logger.error("Reference value differs from returned value " + retval + " != " + data);            }        }    }    /**     * Try to return the first objet in the queue.It does not wait for an object.     * @return the first object in the queue or null if none were found.     */    public Object remove()    {        Object retval = null;        RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);        Vector results = rsp.getResults();        if (results.size() > 0)        {            retval = results.elementAt(0);            if (logger.isDebugEnabled())            {                checkResult(rsp, retval);            }        }        return retval;    }    /**     * @param timeout The time to wait until an entry is retrieved in milliseconds. A value of 0 means wait forever.     * @return the first object in the queue or null if none were found     */    public Object remove(long timeout)    {        Object retval = null;        long start = System.currentTimeMillis();        if (timeout <= 0)        {            while (!stopped && (retval == null))            {                RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);                Vector results = rsp.getResults();                if (results.size() > 0)                {                    retval = results.elementAt(0);                    if (logger.isDebugEnabled())                    {                        checkResult(rsp, retval);                    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?