replicatedtree.java
来自「JGRoups源码」· Java 代码 · 共 1,098 行 · 第 1/3 页
JAVA
1,098 行
// $Id: ReplicatedTree.java,v 1.15 2006/07/31 09:21:58 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.jmx.JmxConfigurator;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import javax.management.MBeanServer;import java.io.Serializable;import java.util.*;/** * A tree-like structure that is replicated across several members. Updates will be multicast to all group * members reliably and in the same order. * @author Bela Ban Jan 17 2002 * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a> */public class ReplicatedTree implements Runnable, MessageListener, MembershipListener { public static final String SEPARATOR="/"; final static int INDENT=4; Node root=new Node(SEPARATOR, SEPARATOR, null, null); final Vector listeners=new Vector(); final Queue request_queue=new Queue(); Thread request_handler=null; JChannel channel=null; PullPushAdapter adapter=null; String groupname="ReplicatedTree-Group"; final Vector members=new Vector(); long state_fetch_timeout=10000; boolean jmx=false; protected final Log log=LogFactory.getLog(this.getClass()); /** Whether or not to use remote calls. If false, all methods will be invoked directly on this instance rather than sending a message to all replicas and only then invoking the method. Useful for testing */ boolean remote_calls=true; String props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):" + "MERGE2(min_interval=5000;max_interval=10000):" + "FD_SOCK:" + "VERIFY_SUSPECT(timeout=1500):" + "pbcast.STABLE(desired_avg_gossip=20000):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):" + "UNICAST(timeout=5000):" + "FRAG(frag_size=16000;down_thread=false;up_thread=false):" + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + "shun=false;print_local_addr=true):" + "pbcast.STATE_TRANSFER"; // "PERF(details=true)"; /** Determines when the updates have to be sent across the network, avoids sending unnecessary * messages when there are no member in the group */ private boolean send_message = false; public interface ReplicatedTreeListener { void nodeAdded(String fqn); void nodeRemoved(String fqn); void nodeModified(String fqn); void viewChange(View new_view); // might be MergeView after merging } /** * Creates a channel with the given properties. Connects to the channel, then creates a PullPushAdapter * and starts it */ public ReplicatedTree(String groupname, String props, long state_fetch_timeout) throws Exception { if(groupname != null) this.groupname=groupname; if(props != null) this.props=props; this.state_fetch_timeout=state_fetch_timeout; channel=new JChannel(this.props); channel.connect(this.groupname); start(); } public ReplicatedTree(String groupname, String props, long state_fetch_timeout, boolean jmx) throws Exception { if(groupname != null) this.groupname=groupname; if(props != null) this.props=props; this.jmx=jmx; this.state_fetch_timeout=state_fetch_timeout; channel=new JChannel(this.props); channel.connect(this.groupname); if(jmx) { MBeanServer server=Util.getMBeanServer(); if(server == null) throw new Exception("No MBeanServers found; need to run with an MBeanServer present, or inside JDK 5"); JmxConfigurator.registerChannel(channel, server, "jgroups", channel.getClusterName() , true); } start(); } public ReplicatedTree() { } /** * Expects an already connected channel. Creates a PullPushAdapter and starts it */ public ReplicatedTree(JChannel channel) throws Exception { this.channel=channel; start(); } public void setRemoteCalls(boolean flag) { remote_calls=flag; } public void setRootNode(Node n) { root=n; } public Address getLocalAddress() { return channel != null? channel.getLocalAddress() : null; } public Vector getMembers() { return members; } /** * Fetch the group state from the current coordinator. If successful, this will trigger setState(). */ public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException { boolean rc=channel.getState(null, timeout); if(log.isInfoEnabled()) { if(rc) log.info("state was retrieved successfully"); else log.info("state could not be retrieved (first member)"); } } public void addReplicatedTreeListener(ReplicatedTreeListener listener) { if(!listeners.contains(listener)) listeners.addElement(listener); } public void removeReplicatedTreeListener(ReplicatedTreeListener listener) { listeners.removeElement(listener); } public final void start() throws Exception { if(request_handler == null) { request_handler=new Thread(this, "ReplicatedTree.RequestHandler thread"); request_handler.setDaemon(true); request_handler.start(); } adapter=new PullPushAdapter(channel, this, this); adapter.setListener(this); boolean rc=channel.getState(null, state_fetch_timeout); if(log.isInfoEnabled()) { if(rc) log.info("state was retrieved successfully"); else log.info("state could not be retrieved (first member)"); } } public void stop() { if(request_handler != null && request_handler.isAlive()) { request_queue.close(true); request_handler=null; } request_handler=null; if(channel != null) { channel.close(); } if(adapter != null) { adapter.stop(); adapter=null; } channel=null; } /** * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created. * Also, parent nodes will be created if not existent. If the node already has data, then the new data * will override the old one. If the node already existed, a nodeModified() notification will be generated. * Otherwise a nodeCreated() motification will be emitted. * @param fqn The fully qualified name of the new node * @param data The new data. May be null if no data should be set in the node. */ public void put(String fqn, HashMap data) { if(!remote_calls) { _put(fqn, data); return; } //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { if(channel == null) { if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast PUT request"); return; } try { channel.send( new Message( null, null, new Request(Request.PUT, fqn, data))); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("failure bcasting PUT request: " + ex); } } else { _put(fqn, data); } } /** * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node * already existed, a nodeModified() notification will be generated. Otherwise a * nodeCreated() motification will be emitted. * @param fqn The fully qualified name of the node * @param key The key * @param value The value */ public void put(String fqn, String key, Object value) { if(!remote_calls) { _put(fqn, key, value); return; } //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { if(channel == null) { if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast PUT request"); return; } try { channel.send( new Message( null, null, new Request(Request.PUT, fqn, key, value))); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("failure bcasting PUT request: " + ex); } } else { _put(fqn, key, value); } } /** * Removes the node from the tree. * @param fqn The fully qualified name of the node. */ public void remove(String fqn) { if(!remote_calls) { _remove(fqn); return; } //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { if(channel == null) { if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast REMOVE request"); return; } try { channel.send( new Message(null, null, new Request(Request.REMOVE, fqn))); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("failure bcasting REMOVE request: " + ex); } } else { _remove(fqn); } } /** * Removes <code>key</code> from the node's hashmap * @param fqn The fullly qualified name of the node * @param key The key to be removed */ public void remove(String fqn, String key) { if(!remote_calls) { _remove(fqn, key); return; } //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { if(channel == null) { if(log.isErrorEnabled()) log.error("channel is null, cannot broadcast REMOVE request"); return; } try { channel.send( new Message( null, null, new Request(Request.REMOVE, fqn, key))); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("failure bcasting REMOVE request: " + ex); } } else { _remove(fqn, key); } } /** * Checks whether a given node exists in the tree * @param fqn The fully qualified name of the node * @return boolean Whether or not the node exists */ public boolean exists(String fqn) { if(fqn == null) return false; return findNode(fqn) != null; } /** * Gets the keys of the <code>data</code> map. Returns all keys as Strings. Returns null if node * does not exist. * @param fqn The fully qualified name of the node * @return Set A set of keys (as Strings) */ public Set getKeys(String fqn) { Node n=findNode(fqn); Map data;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?