replicatedtree.java

来自「JGRoups源码」· Java 代码 · 共 1,098 行 · 第 1/3 页

JAVA
1,098
字号
// $Id: ReplicatedTree.java,v 1.15 2006/07/31 09:21:58 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.jmx.JmxConfigurator;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import javax.management.MBeanServer;import java.io.Serializable;import java.util.*;/** * A tree-like structure that is replicated across several members. Updates will be multicast to all group * members reliably and in the same order. * @author Bela Ban Jan 17 2002 * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a> */public class ReplicatedTree implements Runnable, MessageListener, MembershipListener {    public static final String SEPARATOR="/";    final static int INDENT=4;    Node root=new Node(SEPARATOR, SEPARATOR, null, null);    final Vector listeners=new Vector();    final Queue request_queue=new Queue();    Thread request_handler=null;    JChannel channel=null;    PullPushAdapter adapter=null;    String groupname="ReplicatedTree-Group";    final Vector members=new Vector();    long state_fetch_timeout=10000;    boolean jmx=false;    protected final Log log=LogFactory.getLog(this.getClass());    /** Whether or not to use remote calls. If false, all methods will be invoked directly on this     instance rather than sending a message to all replicas and only then invoking the method.     Useful for testing */    boolean remote_calls=true;    String props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +            "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +            "PING(timeout=2000;num_initial_members=3):" +            "MERGE2(min_interval=5000;max_interval=10000):" +            "FD_SOCK:" +            "VERIFY_SUSPECT(timeout=1500):" +            "pbcast.STABLE(desired_avg_gossip=20000):" +            "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):" +            "UNICAST(timeout=5000):" +            "FRAG(frag_size=16000;down_thread=false;up_thread=false):" +            "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +            "shun=false;print_local_addr=true):" +            "pbcast.STATE_TRANSFER";    // "PERF(details=true)";	/** Determines when the updates have to be sent across the network, avoids sending unnecessary     * messages when there are no member in the group */	private boolean send_message = false;    public interface ReplicatedTreeListener {        void nodeAdded(String fqn);        void nodeRemoved(String fqn);        void nodeModified(String fqn);        void viewChange(View new_view);  // might be MergeView after merging    }    /**     * Creates a channel with the given properties. Connects to the channel, then creates a PullPushAdapter     * and starts it     */    public ReplicatedTree(String groupname, String props, long state_fetch_timeout) throws Exception {        if(groupname != null)            this.groupname=groupname;        if(props != null)            this.props=props;        this.state_fetch_timeout=state_fetch_timeout;        channel=new JChannel(this.props);        channel.connect(this.groupname);        start();    }    public ReplicatedTree(String groupname, String props, long state_fetch_timeout, boolean jmx) throws Exception {        if(groupname != null)            this.groupname=groupname;        if(props != null)            this.props=props;        this.jmx=jmx;        this.state_fetch_timeout=state_fetch_timeout;        channel=new JChannel(this.props);        channel.connect(this.groupname);        if(jmx) {            MBeanServer server=Util.getMBeanServer();            if(server == null)                throw new Exception("No MBeanServers found; need to run with an MBeanServer present, or inside JDK 5");            JmxConfigurator.registerChannel(channel, server, "jgroups", channel.getClusterName() , true);        }        start();    }    public ReplicatedTree() {    }    /**     * Expects an already connected channel. Creates a PullPushAdapter and starts it     */    public ReplicatedTree(JChannel channel) throws Exception {        this.channel=channel;        start();    }    public void setRemoteCalls(boolean flag) {        remote_calls=flag;    }    public void setRootNode(Node n) {        root=n;    }    public Address getLocalAddress() {        return channel != null? channel.getLocalAddress() : null;    }    public Vector getMembers() {        return members;    }    /**     * Fetch the group state from the current coordinator. If successful, this will trigger setState().     */    public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException {        boolean rc=channel.getState(null, timeout);        if(log.isInfoEnabled()) {            if(rc)                log.info("state was retrieved successfully");            else                log.info("state could not be retrieved (first member)");        }    }    public void addReplicatedTreeListener(ReplicatedTreeListener listener) {        if(!listeners.contains(listener))            listeners.addElement(listener);    }    public void removeReplicatedTreeListener(ReplicatedTreeListener listener) {        listeners.removeElement(listener);    }    public final void start() throws Exception {        if(request_handler == null) {            request_handler=new Thread(this, "ReplicatedTree.RequestHandler thread");            request_handler.setDaemon(true);            request_handler.start();        }        adapter=new PullPushAdapter(channel, this, this);        adapter.setListener(this);        boolean rc=channel.getState(null, state_fetch_timeout);        if(log.isInfoEnabled()) {            if(rc)                log.info("state was retrieved successfully");            else                log.info("state could not be retrieved (first member)");        }    }    public void stop() {        if(request_handler != null && request_handler.isAlive()) {            request_queue.close(true);            request_handler=null;        }        request_handler=null;        if(channel != null) {            channel.close();        }        if(adapter != null) {            adapter.stop();            adapter=null;        }        channel=null;    }    /**     * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.     * Also, parent nodes will be created if not existent. If the node already has data, then the new data     * will override the old one. If the node already existed, a nodeModified() notification will be generated.     * Otherwise a nodeCreated() motification will be emitted.     * @param fqn The fully qualified name of the new node     * @param data The new data. May be null if no data should be set in the node.     */    public void put(String fqn, HashMap data) {        if(!remote_calls) {            _put(fqn, data);            return;        }		//Changes done by <aos>		//if true, propagate action to the group        if(send_message == true) {            if(channel == null) {                if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast PUT request");                return;            }            try {                channel.send(                        new Message(                                null,                                null,                                new Request(Request.PUT, fqn, data)));            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("failure bcasting PUT request: " + ex);            }        }        else {            _put(fqn, data);        }    }    /**     * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node     * already existed, a nodeModified() notification will be generated. Otherwise a     * nodeCreated() motification will be emitted.     * @param fqn The fully qualified name of the node     * @param key The key     * @param value The value     */    public void put(String fqn, String key, Object value) {        if(!remote_calls) {            _put(fqn, key, value);            return;        }        //Changes done by <aos>        //if true, propagate action to the group        if(send_message == true) {            if(channel == null) {                if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast PUT request");                return;            }            try {                channel.send(                        new Message(                                null,                                null,                                new Request(Request.PUT, fqn, key, value)));            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("failure bcasting PUT request: " + ex);            }        }        else {            _put(fqn, key, value);        }    }    /**     * Removes the node from the tree.     * @param fqn The fully qualified name of the node.     */    public void remove(String fqn) {        if(!remote_calls) {            _remove(fqn);            return;        }		//Changes done by <aos>		//if true, propagate action to the group        if(send_message == true) {            if(channel == null) {                if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast REMOVE request");                return;            }            try {                channel.send(                        new Message(null, null, new Request(Request.REMOVE, fqn)));            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("failure bcasting REMOVE request: " + ex);            }        }        else {            _remove(fqn);        }    }    /**     * Removes <code>key</code> from the node's hashmap     * @param fqn The fullly qualified name of the node     * @param key The key to be removed     */    public void remove(String fqn, String key) {        if(!remote_calls) {            _remove(fqn, key);            return;        }		//Changes done by <aos>		//if true, propagate action to the group        if(send_message == true) {            if(channel == null) {                if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast REMOVE request");                return;            }            try {                channel.send(                        new Message(                                null,                                null,                                new Request(Request.REMOVE, fqn, key)));            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("failure bcasting REMOVE request: " + ex);            }        }        else {            _remove(fqn, key);        }    }    /**     * Checks whether a given node exists in the tree     * @param fqn The fully qualified name of the node     * @return boolean Whether or not the node exists     */    public boolean exists(String fqn) {        if(fqn == null) return false;        return findNode(fqn) != null;    }    /**     * Gets the keys of the <code>data</code> map. Returns all keys as Strings. Returns null if node     * does not exist.     * @param fqn The fully qualified name of the node     * @return Set A set of keys (as Strings)     */    public Set getKeys(String fqn) {        Node n=findNode(fqn);        Map data;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?