merge2.java

来自「JGRoups源码」· Java 代码 · 共 368 行

JAVA
368
字号
// $Id: MERGE2.java,v 1.29 2006/10/09 14:57:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.View;import org.jgroups.Global;import org.jgroups.stack.Protocol;import org.jgroups.util.Promise;import org.jgroups.util.Util;import java.util.Properties;import java.util.Vector;/** * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done * somewhere above this protocol (typically in the GMS protocol).<p> * This protocol works as follows: * <ul> * <li>If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g. *     by PING or TCPPING protocols. This list contains {coord,addr} pairs. * <li>If there is more than 1 coordinator: *     <ol> *     <li>Get all coordinators *     <li>Create a MERGE event with the list of coordinators as argument *     <li>Send the event up the stack *     </ol> * </ul> * * <p> * * Requires: FIND_INITIAL_MBRS event from below<br> * Provides: sends MERGE event with list of coordinators up the stack<br> * @author Bela Ban, Oct 16 2001 */public class MERGE2 extends Protocol {    Address               local_addr=null;    String                group_name=null;    private FindSubgroups task=null;             // task periodically executing as long as we are coordinator    private final Object  task_lock=new Object();    long                  min_interval=5000;     // minimum time between executions of the FindSubgroups task    long                  max_interval=20000;    // maximum time between executions of the FindSubgroups task    boolean               is_coord=false;    final Promise         find_promise=new Promise(); // to synchronize FindSubgroups.findInitialMembers() on    /** Use a new thread to send the MERGE event up the stack */    boolean               use_separate_thread=false;    public String getName() {        return "MERGE2";    }    public long getMinInterval() {        return min_interval;    }    public void setMinInterval(long i) {        min_interval=i;    }    public long getMaxInterval() {        return max_interval;    }    public void setMaxInterval(long l) {        max_interval=l;    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("min_interval");        if(str != null) {            min_interval=Long.parseLong(str);            props.remove("min_interval");        }        str=props.getProperty("max_interval");        if(str != null) {            max_interval=Long.parseLong(str);            props.remove("max_interval");        }        if(min_interval <= 0 || max_interval <= 0) {            if(log.isErrorEnabled()) log.error("min_interval and max_interval have to be > 0");            return false;        }        if(max_interval <= min_interval) {            if(log.isErrorEnabled()) log.error("max_interval has to be greater than min_interval");            return false;        }        str=props.getProperty("use_separate_thread");        if(str != null) {            use_separate_thread=Boolean.valueOf(str).booleanValue();            props.remove("use_separate_thread");        }        if(props.size() > 0) {            log.error("the following properties are not recognized: " + props);            return false;        }        return true;    }    public Vector requiredDownServices() {        Vector retval=new Vector(1);        retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));        return retval;    }    public void stop() {        is_coord=false;        stopTask();    }    /**     * This prevents the up-handler thread to be created, which is not needed in the protocol.     * DON'T REMOVE !      */    public void startUpHandler() {    }    /**     * This prevents the down-handler thread to be created, which is not needed in the protocol.     * DON'T REMOVE !      */    public void startDownHandler() {    }    public void up(Event evt) {        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                passUp(evt);                break;            case Event.FIND_INITIAL_MBRS_OK:                find_promise.setResult(evt.getArg());                passUp(evt); // could be needed by GMS                break;            default:                passUp(evt);            // Pass up to the layer above us                break;        }    }    public void down(Event evt) {        Vector mbrs;        Address coord;        switch(evt.getType()) {            case Event.CONNECT:                group_name=(String)evt.getArg();                passDown(evt);                break;            case Event.DISCONNECT:                group_name=null;                passDown(evt);                break;            case Event.VIEW_CHANGE:                passDown(evt);                mbrs=((View)evt.getArg()).getMembers();                if(mbrs == null || mbrs.size() == 0 || local_addr == null) {                    stopTask();                    break;                }                coord=(Address)mbrs.elementAt(0);                if(coord.equals(local_addr)) {                    is_coord=true;                    startTask(); // start task if we became coordinator (doesn't start if already running)                }                else {                    // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone                    // else becomes the new coordinator of the merged group                    if(is_coord) {                        is_coord=false;                    }                    stopTask();                }                break;            default:                passDown(evt);          // Pass on to the layer below us                break;        }    }    /* -------------------------------------- Private Methods --------------------------------------- */    void startTask() {        synchronized(task_lock) {            if(task == null)                task=new FindSubgroups();            task.start();            if(group_name != null) {                String tmp, prefix=Global.THREAD_PREFIX;                tmp=task.getName();                if(tmp != null && tmp.indexOf(prefix) == -1) {                    tmp+=prefix + group_name + ")";                    task.setName(tmp);                }            }        }    }    void stopTask() {        synchronized(task_lock) {            if(task != null) {                task.stop();                task=null;            }        }    }    /* ---------------------------------- End of Private Methods ------------------------------------ */    /**     * Task periodically executing (if role is coordinator). Gets the initial membership and determines     * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event     * with the list of the coordinators up the stack     */    private class FindSubgroups implements Runnable {        Thread thread=null;        String getName() {            return thread != null? thread.getName() : null;        }        void setName(String thread_name) {            if(thread != null)                thread.setName(thread_name);        }        public void start() {            if(thread == null || !thread.isAlive()) {                thread=new Thread(Util.getGlobalThreadGroup(), this, "MERGE2.FindSubgroups thread");                thread.setDaemon(true);                thread.start();            }        }        public void stop() {            if(thread != null) {                Thread tmp=thread;                thread=null;                tmp.interrupt(); // wakes up sleeping thread                find_promise.reset();            }            thread=null;        }        public void run() {            long interval;            Vector coords;            Vector initial_mbrs;            // if(log.isDebugEnabled()) log.debug("merge task started as I'm the coordinator");            while(thread != null && Thread.currentThread().equals(thread)) {                interval=computeInterval();                Util.sleep(interval);                if(thread == null) break;                initial_mbrs=findInitialMembers();                if(thread == null) break;                if(log.isDebugEnabled()) log.debug("initial_mbrs=" + initial_mbrs);                coords=detectMultipleCoordinators(initial_mbrs);                if(coords != null && coords.size() > 1) {                    if(log.isDebugEnabled())                        log.debug("found multiple coordinators: " + coords + "; sending up MERGE event");                    final Event evt=new Event(Event.MERGE, coords);                    if(use_separate_thread) {                        Thread merge_notifier=new Thread() {                            public void run() {                                passUp(evt);                            }                        };                        merge_notifier.setDaemon(true);                        merge_notifier.setName("merge notifier thread");                        merge_notifier.start();                    }                    else {                        passUp(evt);                    }                }            }            if(trace)                log.trace("MERGE2.FindSubgroups thread terminated (local_addr=" + local_addr + ")");        }        /**         * Returns a random value within [min_interval - max_interval]         */        long computeInterval() {            return min_interval + Util.random(max_interval - min_interval);        }        /**         * Returns a list of PingRsp pairs.         */        Vector findInitialMembers() {            PingRsp tmp=new PingRsp(local_addr, local_addr, true);            find_promise.reset();            passDown(Event.FIND_INITIAL_MBRS_EVT);            Vector retval=(Vector)find_promise.getResult(0); // wait indefinitely until response is received            if(retval != null && is_coord && local_addr != null && !retval.contains(tmp))                retval.add(tmp);            return retval;        }        /**         * Finds out if there is more than 1 coordinator in the initial_mbrs vector (contains PingRsp elements).         * @param initial_mbrs A list of PingRsp pairs         * @return Vector A list of the coordinators (Addresses) found. Will contain just 1 element for a correct         *         membership, and more than 1 for multiple coordinators         */        Vector detectMultipleCoordinators(Vector initial_mbrs) {            Vector ret=new Vector(11);            PingRsp rsp;            Address coord;            if(initial_mbrs == null) return null;            for(int i=0; i < initial_mbrs.size(); i++) {                rsp=(PingRsp)initial_mbrs.elementAt(i);                if(!rsp.is_server)                    continue;                coord=rsp.getCoordAddress();                if(!ret.contains(coord))                    ret.addElement(coord);            }            return ret;        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?