⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stable.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * passed down by default by the superclass after this method returns !</b>     *     * @return boolean Defaults to true. If false, event will not be passed     * down the stack.     */    public boolean handleDownEvent(Event evt) {        switch(evt.getType()) {            case Event.VIEW_CHANGE:                if(!downViewChange(evt))                    return (false);                break;                // does anyone else below needs this msg except STABLE?            case Event.GET_MSGS_RECEIVED_OK:                if(!downGetMsgsReceived(evt))                    return (false);                break;        }        return (true);    }    /**     * The gossip task that runs periodically     */    private void gossipRun() {        num_msgs=max_msgs;        sendGossip();    }    /**     * <pre>     * Reset the state of msg garbage-collection:     * i. Reset the table of highest seqnos seen by each member     * ii. Reset the tbl of mbrs from which highest seqnos have been recorded     * </pre>     */    private void initialize() {        synchronized(this) {            seqnos=new long[mbrs.size()];            for(int i=0; i < seqnos.length; i++)                seqnos[i]=-1;            heard_from=new boolean[mbrs.size()];            for(int i=0; i < heard_from.length; i++)                heard_from[i]=false;        }    }    /**     * (1)<br>     * Merge this member's table of highest seqnos seen by a each member     * with the one received from a gossip by another member. The result is     * the element-wise minimum of the input arrays. For each entry:<br>     *     * <tt>seqno[mbr_i] = min(seqno[mbr_i], gossip_seqno[mbr_i])</tt>     * <p>     *     * (2)<br>     * Merge the <tt>heard from</tt> tables of this member and the sender of     * the gossip. The resulting table is:<br>     *     * <tt>heard_from[mbr_i] = heard_from[mbr_i] | sender_heard[mbr_i]</tt>     *     * @param sender the sender of the gossip     * @param gossip_seqnos the highest deliverable seqnos of the sender     * @param gossip_heard_from the table of members sender has heard from     *     */    private void update(Object sender, long[] gossip_seqnos,                        boolean[] gossip_heard_from) {        int index;        synchronized(this) {            index=mbrs.indexOf(sender);            if(index < 0) {                 if(warn) log.warn("sender " + sender + " not found in mbrs !");                return;            }            for(int i=0; i < gossip_seqnos.length; i++)                seqnos[i]=Math.min(seqnos[i], gossip_seqnos[i]);            heard_from[index]=true;            for(int i=0; i < heard_from.length; i++)                heard_from[i]=heard_from[i] | gossip_heard_from[i];        }    }    /**     * Set the seqnos and heard_from arrays to those of the sender. The     * method is called when the sender seems to know more than this member.     * The situation occurs if either:     * <ul>     * <li>     * sender.heard_from > this.heard_from, i.e. the sender has heard     * from more members than we have</li>     * <li>     * sender.round > this.round, i.e. the sender is in a more recent round     * than we are</li>     * </ul>     *     * In both cases, this member is assigned the state of the sender     */    private void set(Object sender, long[] gossip_seqnos,                     boolean[] gossip_heard_from) {        int index;        synchronized(this) {            index=mbrs.indexOf(sender);            if(index < 0) {                 if(warn) log.warn("sender " + sender + " not found in mbrs !");                return;            }            seqnos=gossip_seqnos;            heard_from=gossip_heard_from;        }    }    /**     * @return true, if we have received the highest deliverable seqnos     * directly or indirectly from all members     */    private boolean heardFromAll() {        synchronized(this) {            if(heard_from == null) return false;            for(int i=0; i < heard_from.length; i++)                if(!heard_from[i])                    return false;        }        return true;    }    /**     * Send our <code>seqnos</code> array to a subset of the membership     */    private void sendGossip() {        Vector gossip_subset;        Object[] params;        MethodCall call;        synchronized(this) {            gossip_subset=Util.pickSubset(mbrs, subset);            if(gossip_subset == null || gossip_subset.size() < 1) {                 if(warn) log.warn("picked empty subset !");                return;            }                if(log.isInfoEnabled()) log.info("subset=" + gossip_subset + ", round=" + round + ", seqnos=" +                        Util.array2String(seqnos));            params=new Object[]{                vid.clone(),                new Long(round),                seqnos.clone(),                heard_from.clone(),                local_addr};        }        call=new MethodCall("gossip", params,             new String[] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), boolean[].class.getName(), Object.class.getName()});        for(int i=0; i < gossip_subset.size(); i++) {            try {                callRemoteMethod((Address)gossip_subset.get(i), call, GroupRequest.GET_NONE, 0);            }            catch(Exception e) {                 if(log.isDebugEnabled()) log.debug("exception=" + e);            }        }    }    /**     * Sends GET_MSGS_RECEIVED to NAKACK layer (above us !) and stores result     * in <code>seqnos</code>. In case <code>seqnos</code> does not yet exist     * it creates and initializes it.     */    private void getHighestSeqnos() {        synchronized(highest_seqnos_mutex) {            passUp(new Event(Event.GET_MSGS_RECEIVED));            try {                highest_seqnos_mutex.wait(highest_seqnos_timeout);            }            catch(InterruptedException e) {                if(log.isErrorEnabled()) log.error("Interrupted while waiting for highest seqnos from NAKACK");            }        }    }    /**     * Start scheduling the gossip task     */    private void startGossip() {        synchronized(this) {            if(gossip_task != null)                gossip_task.cancel();            gossip_task=new Task(new Times(new long[]{GOSSIP_INTERVAL}));            sched.add(gossip_task);        }    }    /**     * Received a <tt>MSG</tt> event from a layer below     *     * A msg received:     * If unicast ignore; if multicast and time for gossiping has been     * reached, send out a gossip to a subset of the mbrs     *     * @return true if the event should be forwarded to the layer above     */    private boolean upMsg(Event e) {        Message msg=(Message)e.getArg();        if(msg.getDest() != null && (!msg.getDest().isMulticastAddress()))            return (true);        synchronized(this) {            --num_msgs;            if(num_msgs > 0)                return (true);            num_msgs=max_msgs;            gossip_task.cancel();            gossip_task=new Task(new Times(new long[]{0, GOSSIP_INTERVAL}));            sched.add(gossip_task);        }        return (true);    }    /**     * Received a <tt>VIEW_CHANGE</tt> event from a layer above     *     * A new view:     * i. Set the new mbrs list and the new view ID.     * ii. Reset the highest deliverable seqnos seen     *     * @return true if the event should be forwarded to the layer below     */    private boolean downViewChange(Event e) {        View v=(View)e.getArg();        Vector new_mbrs=v.getMembers();        /*          // Could this ever happen? GMS is always sending non-null value          if(new_mbrs == null) {          / Trace.println(          "STABLE.handleDownEvent()", Trace.ERROR,          "Received VIEW_CHANGE event with null mbrs list");          break;          }        */        synchronized(this) {            vid=v.getVid();            mbrs.clear();            mbrs.addAll(new_mbrs);            initialize();        }        return (true);    }    /**     * Received a <tt>GET_MSGS__RECEIVED_OK</tt> event from a layer above     *     * Updated list of highest deliverable seqnos:     * i. Update the local copy of highest deliverable seqnos     *     * @return true if the event should be forwarded to the layer below     */    private boolean downGetMsgsReceived(Event e) {        long[] new_seqnos=(long[])e.getArg();        try {            synchronized(this) {                if(new_seqnos == null)                    return (true);                if(new_seqnos.length != seqnos.length) {                        if(log.isInfoEnabled()) log.info("GET_MSGS_RECEIVED: array of highest " +                                "seqnos seen so far (received from NAKACK layer) " +                                "has a different length (" + new_seqnos.length +                                ") from 'seqnos' array (" + seqnos.length + ')');                    return (true);                }                System.arraycopy(new_seqnos, 0, seqnos, 0, seqnos.length);            }        }        finally {            synchronized(highest_seqnos_mutex) {                highest_seqnos_mutex.notifyAll();            }        }        return (true);    }    /**     * Select next interval from list. Once the end of the list is reached,     * keep returning the last value. It would be sensible that list of     * times is in increasing order     */    private static class Times {        private int next=0;        private long[] times;        Times(long[] times) {            if(times.length == 0)                throw new IllegalArgumentException("times");            this.times=times;        }        public synchronized long next() {            if(next >= times.length)                return (times[times.length - 1]);            else                return (times[next++]);        }        public long[] times() {            return (times);        }        public synchronized void reset() {            next=0;        }    }    /**     * The gossiping task     */    private class Task implements TimeScheduler.Task {        private final Times intervals;        private boolean cancelled=false;        Task(Times intervals) {            this.intervals=intervals;        }        public long nextInterval() {            return (intervals.next());        }        public boolean cancelled() {            return (cancelled);        }        public void cancel() {            cancelled=true;        }        public void run() {            gossipRun();        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -