distributedqueue.java
来自「JGRoups源码」· Java 代码 · 共 733 行 · 第 1/2 页
JAVA
733 行
// $Id: DistributedQueue.java,v 1.19 2006/07/31 09:21:58 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.Serializable;import java.util.*;/** * Provides the abstraction of a java.util.LinkedList that is replicated at several * locations. Any change to the list (reset, add, remove, etc.) will transparently be * propagated to all replicas in the group. All read-only methods will always access the * local replica.<p> * Both keys and values added to the list <em>must be serializable</em>, the reason * being that they will be sent across the network to all replicas of the group. * An instance of this class will contact an existing member of the group to fetch its * initial state. * Beware to use a <em>total protocol</em> on initialization or elements would not be in same * order on all replicas. * @author Romuald du Song */public class DistributedQueue implements MessageListener, MembershipListener, Cloneable{ public interface Notification { void entryAdd(Object value); void entryRemoved(Object key); void viewChange(Vector new_mbrs, Vector old_mbrs); void contentsCleared(); void contentsSet(Collection new_entries); } protected Log logger = LogFactory.getLog(getClass()); private long internal_timeout = 10000; // 10 seconds to wait for a response /*lock object for synchronization*/ protected final Object mutex = new Object(); protected boolean stopped = false; // whether to we are stopped ! protected LinkedList internalQueue; protected Channel channel; protected RpcDispatcher disp = null; protected String groupname = null; protected Vector notifs = new Vector(); // to be notified when mbrship changes protected Vector members = new Vector(); // keeps track of all DHTs private Class[] add_signature = null; private Class[] addAtHead_signature = null; private Class[] addAll_signature = null; private Class[] reset_signature = null; private Class[] remove_signature = null; /** * Creates a DistributedQueue * @param groupname The name of the group to join * @param factory The ChannelFactory which will be used to create a channel * @param properties The property string to be used to define the channel * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever. */ public DistributedQueue(String groupname, ChannelFactory factory, String properties, long state_timeout) throws ChannelException { if (logger.isDebugEnabled()) { logger.debug("DistributedQueue(" + groupname + ',' + properties + ',' + state_timeout); } this.groupname = groupname; initSignatures(); internalQueue = new LinkedList(); channel = (factory != null) ? factory.createChannel(properties) : new JChannel(properties); disp = new RpcDispatcher(channel, this, this, this); disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall channel.connect(groupname); start(state_timeout); } public DistributedQueue(JChannel channel) { this.groupname = channel.getClusterName(); this.channel = channel; init(); } /** * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be * used to register under that id. This is typically used when another building block is already using * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the * first block created on PullPushAdapter. * The caller needs to call start(), before using the this block. It gives the opportunity for the caller * to register as a lessoner for Notifications events. * @param adapter The PullPushAdapter which to use as underlying transport * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between * requests/responses for different building blocks on top of PullPushAdapter. */ public DistributedQueue(PullPushAdapter adapter, Serializable id) { this.channel = (Channel)adapter.getTransport(); this.groupname = this.channel.getClusterName(); initSignatures(); internalQueue = new LinkedList(); disp = new RpcDispatcher(adapter, id, this, this, this); disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall } protected final void init() { initSignatures(); internalQueue = new LinkedList(); disp = new RpcDispatcher(channel, this, this, this); disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall } public final void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException { boolean rc; logger.debug("DistributedQueue.initState(" + groupname + "): starting state retrieval"); rc = channel.getState(null, state_timeout); if (rc) { logger.info("DistributedQueue.initState(" + groupname + "): state was retrieved successfully"); } else { logger.info("DistributedQueue.initState(" + groupname + "): state could not be retrieved (first member)"); } } public Address getLocalAddress() { return (channel != null) ? channel.getLocalAddress() : null; } public Channel getChannel() { return channel; } public void addNotifier(Notification n) { if (n != null && !notifs.contains(n)) { notifs.addElement(n); } } public void removeNotifier(Notification n) { notifs.removeElement(n); } public void stop() { /*lock the queue from other threads*/ synchronized (mutex) { internalQueue.clear(); if (disp != null) { disp.stop(); disp = null; } if (channel != null) { channel.close(); channel = null; } stopped = true; } } /** * Add the speficied element at the bottom of the queue * @param value */ public void add(Object value) { try { Object retval = null; RspList rsp = disp.callRemoteMethods(null, "_add", new Object[]{value}, add_signature, GroupRequest.GET_ALL, 0); Vector results = rsp.getResults(); if (results.size() > 0) { retval = results.elementAt(0); if (logger.isDebugEnabled()) { checkResult(rsp, retval); } } } catch (Exception e) { logger.error("Unable to add value " + value, e); } } /** * Add the speficied element at the top of the queue * @param value */ public void addAtHead(Object value) { try { disp.callRemoteMethods(null, "_addAtHead", new Object[]{value}, addAtHead_signature, GroupRequest.GET_ALL, 0); } catch (Exception e) { logger.error("Unable to addAtHead value " + value, e); } } /** * Add the speficied collection to the top of the queue. * Elements are added in the order that they are returned by the specified * collection's iterator. * @param values */ public void addAll(Collection values) { try { disp.callRemoteMethods(null, "_addAll", new Object[]{values}, addAll_signature, GroupRequest.GET_ALL, 0); } catch (Exception e) { logger.error("Unable to addAll value: " + values, e); } } public Vector getContents() { Vector result = new Vector(); for (Iterator e = internalQueue.iterator(); e.hasNext();) result.add(e.next()); return result; } public int size() { return internalQueue.size(); } /** * returns the first object on the queue, without removing it. * If the queue is empty this object blocks until the first queue object has * been added * @return the first object on the queue */ public Object peek() { Object retval = null; try { retval = internalQueue.getFirst(); } catch (NoSuchElementException e) { } return retval; } public void reset() { try { disp.callRemoteMethods(null, "_reset", null, reset_signature, GroupRequest.GET_ALL, 0); } catch (Exception e) { logger.error("DistributedQueue.reset(" + groupname + ')', e); } } protected void checkResult(RspList rsp, Object retval) { if (logger.isDebugEnabled()) { logger.debug("Value updated from " + groupname + " :" + retval); } Vector results = rsp.getResults(); for (int i = 0; i < results.size(); i++) { Object data = results.elementAt(i); if (!data.equals(retval)) { logger.error("Reference value differs from returned value " + retval + " != " + data); } } } /** * Try to return the first objet in the queue.It does not wait for an object. * @return the first object in the queue or null if none were found. */ public Object remove() { Object retval = null; RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout); Vector results = rsp.getResults(); if (results.size() > 0) { retval = results.elementAt(0); if (logger.isDebugEnabled()) { checkResult(rsp, retval); } } return retval; } /** * @param timeout The time to wait until an entry is retrieved in milliseconds. A value of 0 means wait forever. * @return the first object in the queue or null if none were found */ public Object remove(long timeout) { Object retval = null; long start = System.currentTimeMillis(); if (timeout <= 0) { while (!stopped && (retval == null)) { RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout); Vector results = rsp.getResults(); if (results.size() > 0) { retval = results.elementAt(0); if (logger.isDebugEnabled()) { checkResult(rsp, retval); }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?