⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedtree.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: DistributedTree.java,v 1.15 2006/03/27 08:34:24 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Util;import java.io.Serializable;import java.util.StringTokenizer;import java.util.Vector;/** * A tree-like structure that is replicated across several members. Updates will be multicast to all group * members reliably and in the same order. * @author Bela Ban * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a> */public class DistributedTree implements MessageListener, MembershipListener {    Node root=null;    final Vector listeners=new Vector();    final Vector view_listeners=new Vector();    final Vector members=new Vector();    protected Channel channel=null;    protected RpcDispatcher disp=null;    String groupname="DistributedTreeGroup";    String channel_properties="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0):" +            "PING(timeout=5000;num_initial_members=6):" +            "FD_SOCK:" +            "VERIFY_SUSPECT(timeout=1500):" +            "pbcast.STABLE(desired_avg_gossip=10000):" +            "pbcast.NAKACK(gc_lag=5;retransmit_timeout=3000;trace=true):" +            "UNICAST(timeout=5000):" +            "FRAG(down_thread=false;up_thread=false):" +            "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +            "shun=false;print_local_addr=true):" +            "pbcast.STATE_TRANSFER(trace=true)";    final long state_timeout=5000;   // wait 5 secs max to obtain state	/** Determines when the updates have to be sent across the network, avoids sending unnecessary     * messages when there are no member in the group */	private boolean send_message = false;    protected static final Log log=LogFactory.getLog(DistributedTree.class);    public interface DistributedTreeListener {        void nodeAdded(String fqn, Serializable element);        void nodeRemoved(String fqn);        void nodeModified(String fqn, Serializable old_element, Serializable new_element);    }    public interface ViewListener {        void viewChange(Vector new_mbrs, Vector old_mbrs);    }    public DistributedTree() {    }    public DistributedTree(String groupname, String channel_properties) {        this.groupname=groupname;        if(channel_properties != null)            this.channel_properties=channel_properties;    }    /*     * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be     * used to register under that id. This is typically used when another building block is already using     * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate     * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the     * first block created on PullPushAdapter.     * @param adapter The PullPushAdapter which to use as underlying transport     * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between     *           requests/responses for different building blocks on top of PullPushAdapter.     * @param state_timeout Max number of milliseconds to wait until state is     * retrieved     */    public DistributedTree(PullPushAdapter adapter, Serializable id, long state_timeout)         throws ChannelException {        channel = (Channel)adapter.getTransport();        disp=new RpcDispatcher(adapter, id, this, this, this);        boolean rc = channel.getState(null, state_timeout);        if(rc) {            if(log.isInfoEnabled()) log.info("state was retrieved successfully");        }        else            if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)");    }    public Object getLocalAddress() {        return channel != null? channel.getLocalAddress() : null;    }    public void setDeadlockDetection(boolean flag) {        if(disp != null)            disp.setDeadlockDetection(flag);    }    public void start() throws Exception {        start(8000);    }    public void start(long timeout) throws Exception {        if(channel != null) // already started            return;        channel=new JChannel(channel_properties);        disp=new RpcDispatcher(channel, this, this, this);        channel.connect(groupname);        boolean rc=channel.getState(null, timeout);        if(rc) {            if(log.isInfoEnabled()) log.info("state was retrieved successfully");        }        else            if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)");    }    public void stop() {        if(channel != null) {            channel.close();            disp.stop();        }        channel=null;        disp=null;    }    public void addDistributedTreeListener(DistributedTreeListener listener) {        if(!listeners.contains(listener))            listeners.addElement(listener);    }    public void removeDistributedTreeListener(DistributedTreeListener listener) {        listeners.removeElement(listener);    }    public void addViewListener(ViewListener listener) {        if(!view_listeners.contains(listener))            view_listeners.addElement(listener);    }    public void removeViewListener(ViewListener listener) {        view_listeners.removeElement(listener);    }    public void add(String fqn) {        //Changes done by <aos>        //if true, propagate action to the group        if(send_message == true) {            try {				MethodCall call = new MethodCall("_add", new Object[] {fqn}, new String[] {String.class.getName()});                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, 0);            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("exception=" + ex);            }        }        else {            _add(fqn);        }    }    public void add(String fqn, Serializable element) {        add(fqn, element, 0);    }    /** resets an existing node, useful after a merge when you want to tell other      *  members of your state, but do not wish to remove and then add as two separate calls */    public void reset(String fqn, Serializable element)     {        reset(fqn, element, 0);    }    public void remove(String fqn) {        remove(fqn, 0);    }    public void add(String fqn, Serializable element, int timeout) {        //Changes done by <aos>        //if true, propagate action to the group        if(send_message == true) {            try {				MethodCall call = new MethodCall("_add", new Object[] {fqn, element},                     new String[] {String.class.getName(), Serializable.class.getName()});                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("exception=" + ex);            }        }        else {            _add(fqn, element);        }    }    /** resets an existing node, useful after a merge when you want to tell other      *  members of your state, but do not wish to remove and then add as two separate calls */    public void reset(String fqn, Serializable element, int timeout)     {        //Changes done by <aos>        //if true, propagate action to the group        if(send_message == true) {            try {				MethodCall call = new MethodCall("_reset", new Object[] {fqn, element},                     new String[] {String.class.getName(), Serializable.class.getName()});                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("exception=" + ex);            }        }        else {            _add(fqn, element);        }    }    public void remove(String fqn, int timeout) {        //Changes done by <aos>        //if true, propagate action to the group        if(send_message == true) {            try {            	MethodCall call = new MethodCall("_remove", new Object[] {fqn}, new String[] {String.class.getName()});                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("exception=" + ex);            }        }        else {            _remove(fqn);        }    }    public boolean exists(String fqn) {        if(fqn == null)            return false;        return findNode(fqn) == null? false : true;    }    public Serializable get(String fqn) {        Node n=null;        if(fqn == null) return null;        n=findNode(fqn);        if(n != null) {            return n.element;        }        return null;    }    public void set(String fqn, Serializable element) {		set(fqn, element, 0);    }    public void set(String fqn, Serializable element, int timeout) {		//Changes done by <aos>		//if true, propagate action to the group        if(send_message == true) {            try {				MethodCall call = new MethodCall("_set", new Object[] {fqn, element},                     new String[] {String.class.getName(), Serializable.class.getName()});                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);            }            catch(Exception ex) {                if(log.isErrorEnabled()) log.error("exception=" + ex);            }        }        else {            _set(fqn, element);        }    }    /** Returns all children of a Node as strings */    public Vector getChildrenNames(String fqn) {        Vector ret=new Vector();        Node n;        if(fqn == null) return ret;        n=findNode(fqn);        if(n == null || n.children == null) return ret;        for(int i=0; i < n.children.size(); i++)            ret.addElement(((Node)n.children.elementAt(i)).name);        return ret;    }    public String print() {        StringBuffer sb=new StringBuffer();        int indent=0;        if(root == null)            return "/";        sb.append(root.print(indent));        return sb.toString();    }    /** Returns all children of a Node as Nodes */    Vector getChildren(String fqn) {        Node n;        if(fqn == null) return null;        n=findNode(fqn);        if(n == null) return null;        return n.children;    }    /**     * Returns the name of the group that the DistributedTree is connected to     * @return String     */    public String  getGroupName()           {return groupname;}	 	    /**     * Returns the Channel the DistributedTree is connected to      * @return Channel     */    public Channel getChannel()             {return channel;}   /**     * Returns the number of current members joined to the group     * @return int     */    public int getGroupMembersNumber()			{return members.size();}    /*--------------------- Callbacks --------------------------*/    public void _add(String fqn) {        _add(fqn, null);    }    public void _add(String fqn, Serializable element) {        Node curr, n;        StringTokenizer tok;        String child_name;        String tmp_fqn="";        if(root == null) {            root=new Node("/", null);            notifyNodeAdded("/", null);        }        if(fqn == null)            return;        curr=root;        tok=new StringTokenizer(fqn, "/");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -