📄 distributedtree.java
字号:
// $Id: DistributedTree.java,v 1.15 2006/03/27 08:34:24 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Util;import java.io.Serializable;import java.util.StringTokenizer;import java.util.Vector;/** * A tree-like structure that is replicated across several members. Updates will be multicast to all group * members reliably and in the same order. * @author Bela Ban * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a> */public class DistributedTree implements MessageListener, MembershipListener { Node root=null; final Vector listeners=new Vector(); final Vector view_listeners=new Vector(); final Vector members=new Vector(); protected Channel channel=null; protected RpcDispatcher disp=null; String groupname="DistributedTreeGroup"; String channel_properties="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0):" + "PING(timeout=5000;num_initial_members=6):" + "FD_SOCK:" + "VERIFY_SUSPECT(timeout=1500):" + "pbcast.STABLE(desired_avg_gossip=10000):" + "pbcast.NAKACK(gc_lag=5;retransmit_timeout=3000;trace=true):" + "UNICAST(timeout=5000):" + "FRAG(down_thread=false;up_thread=false):" + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + "shun=false;print_local_addr=true):" + "pbcast.STATE_TRANSFER(trace=true)"; final long state_timeout=5000; // wait 5 secs max to obtain state /** Determines when the updates have to be sent across the network, avoids sending unnecessary * messages when there are no member in the group */ private boolean send_message = false; protected static final Log log=LogFactory.getLog(DistributedTree.class); public interface DistributedTreeListener { void nodeAdded(String fqn, Serializable element); void nodeRemoved(String fqn); void nodeModified(String fqn, Serializable old_element, Serializable new_element); } public interface ViewListener { void viewChange(Vector new_mbrs, Vector old_mbrs); } public DistributedTree() { } public DistributedTree(String groupname, String channel_properties) { this.groupname=groupname; if(channel_properties != null) this.channel_properties=channel_properties; } /* * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be * used to register under that id. This is typically used when another building block is already using * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the * first block created on PullPushAdapter. * @param adapter The PullPushAdapter which to use as underlying transport * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between * requests/responses for different building blocks on top of PullPushAdapter. * @param state_timeout Max number of milliseconds to wait until state is * retrieved */ public DistributedTree(PullPushAdapter adapter, Serializable id, long state_timeout) throws ChannelException { channel = (Channel)adapter.getTransport(); disp=new RpcDispatcher(adapter, id, this, this, this); boolean rc = channel.getState(null, state_timeout); if(rc) { if(log.isInfoEnabled()) log.info("state was retrieved successfully"); } else if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)"); } public Object getLocalAddress() { return channel != null? channel.getLocalAddress() : null; } public void setDeadlockDetection(boolean flag) { if(disp != null) disp.setDeadlockDetection(flag); } public void start() throws Exception { start(8000); } public void start(long timeout) throws Exception { if(channel != null) // already started return; channel=new JChannel(channel_properties); disp=new RpcDispatcher(channel, this, this, this); channel.connect(groupname); boolean rc=channel.getState(null, timeout); if(rc) { if(log.isInfoEnabled()) log.info("state was retrieved successfully"); } else if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)"); } public void stop() { if(channel != null) { channel.close(); disp.stop(); } channel=null; disp=null; } public void addDistributedTreeListener(DistributedTreeListener listener) { if(!listeners.contains(listener)) listeners.addElement(listener); } public void removeDistributedTreeListener(DistributedTreeListener listener) { listeners.removeElement(listener); } public void addViewListener(ViewListener listener) { if(!view_listeners.contains(listener)) view_listeners.addElement(listener); } public void removeViewListener(ViewListener listener) { view_listeners.removeElement(listener); } public void add(String fqn) { //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { MethodCall call = new MethodCall("_add", new Object[] {fqn}, new String[] {String.class.getName()}); disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, 0); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("exception=" + ex); } } else { _add(fqn); } } public void add(String fqn, Serializable element) { add(fqn, element, 0); } /** resets an existing node, useful after a merge when you want to tell other * members of your state, but do not wish to remove and then add as two separate calls */ public void reset(String fqn, Serializable element) { reset(fqn, element, 0); } public void remove(String fqn) { remove(fqn, 0); } public void add(String fqn, Serializable element, int timeout) { //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { MethodCall call = new MethodCall("_add", new Object[] {fqn, element}, new String[] {String.class.getName(), Serializable.class.getName()}); disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("exception=" + ex); } } else { _add(fqn, element); } } /** resets an existing node, useful after a merge when you want to tell other * members of your state, but do not wish to remove and then add as two separate calls */ public void reset(String fqn, Serializable element, int timeout) { //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { MethodCall call = new MethodCall("_reset", new Object[] {fqn, element}, new String[] {String.class.getName(), Serializable.class.getName()}); disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("exception=" + ex); } } else { _add(fqn, element); } } public void remove(String fqn, int timeout) { //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { MethodCall call = new MethodCall("_remove", new Object[] {fqn}, new String[] {String.class.getName()}); disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("exception=" + ex); } } else { _remove(fqn); } } public boolean exists(String fqn) { if(fqn == null) return false; return findNode(fqn) == null? false : true; } public Serializable get(String fqn) { Node n=null; if(fqn == null) return null; n=findNode(fqn); if(n != null) { return n.element; } return null; } public void set(String fqn, Serializable element) { set(fqn, element, 0); } public void set(String fqn, Serializable element, int timeout) { //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { MethodCall call = new MethodCall("_set", new Object[] {fqn, element}, new String[] {String.class.getName(), Serializable.class.getName()}); disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("exception=" + ex); } } else { _set(fqn, element); } } /** Returns all children of a Node as strings */ public Vector getChildrenNames(String fqn) { Vector ret=new Vector(); Node n; if(fqn == null) return ret; n=findNode(fqn); if(n == null || n.children == null) return ret; for(int i=0; i < n.children.size(); i++) ret.addElement(((Node)n.children.elementAt(i)).name); return ret; } public String print() { StringBuffer sb=new StringBuffer(); int indent=0; if(root == null) return "/"; sb.append(root.print(indent)); return sb.toString(); } /** Returns all children of a Node as Nodes */ Vector getChildren(String fqn) { Node n; if(fqn == null) return null; n=findNode(fqn); if(n == null) return null; return n.children; } /** * Returns the name of the group that the DistributedTree is connected to * @return String */ public String getGroupName() {return groupname;} /** * Returns the Channel the DistributedTree is connected to * @return Channel */ public Channel getChannel() {return channel;} /** * Returns the number of current members joined to the group * @return int */ public int getGroupMembersNumber() {return members.size();} /*--------------------- Callbacks --------------------------*/ public void _add(String fqn) { _add(fqn, null); } public void _add(String fqn, Serializable element) { Node curr, n; StringTokenizer tok; String child_name; String tmp_fqn=""; if(root == null) { root=new Node("/", null); notifyNodeAdded("/", null); } if(fqn == null) return; curr=root; tok=new StringTokenizer(fqn, "/");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -