⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stable.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: STABLE.java,v 1.46 2006/05/17 10:54:38 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Streamable;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.io.*;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Vector;/** * Computes the broadcast messages that are stable; i.e., have been received by all members. Sends * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that * have been seen by all members.<p> * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group. * A stability vector, which maintains the highest seqno for each member and initially contains no data, * is updated when such a message is received. The entry for a member P is computed set to * min(entry[P], digest[P]). When messages from all members have been received, a stability * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection * in the NAKACK layer).<p> * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous * STABLE messages in the face of no activity.<br/> * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0), * a STABLE task will be started (unless it is already running). * @author Bela Ban */public class STABLE extends Protocol {    Address             local_addr=null;    final Vector        mbrs=new Vector();    final Digest        digest=new Digest(10);        // keeps track of the highest seqnos from all members    final Digest        latest_local_digest=new Digest(10); // keeps track of the latest digests received from NAKACK    final Vector        heard_from=new Vector();      // keeps track of who we already heard from (STABLE_GOSSIP msgs)    /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */    long                desired_avg_gossip=20000;    /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very     * small number (> 0 !) if <code>max_bytes</code> is used */    long                stability_delay=6000;    private StabilitySendTask   stability_task=null;    final Object        stability_mutex=new Object();   // to synchronize on stability_task    private volatile StableTask  stable_task=null;               // bcasts periodic STABLE message (added to timer below)    final Object        stable_task_mutex=new Object(); // to sync on stable_task    TimeScheduler       timer=null;                     // to send periodic STABLE msgs (and STABILITY messages)    static final String name="STABLE";    /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE     * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally     * <code>stability_delay</code> should be set to a low number as well */    long                max_bytes=0;    /** The total number of bytes received from unicast and multicast messages */    long                num_bytes_received=0;    /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor     * handle STABILITY messages */    boolean             suspended=false;    boolean             initialized=false;    private ResumeTask  resume_task=null;    final Object        resume_task_mutex=new Object();    /** Number of gossip messages */    int                 num_gossips=0;        private static final long MAX_SUSPEND_TIME=200000;    public String getName() {        return name;    }    public long getDesiredAverageGossip() {        return desired_avg_gossip;    }    public void setDesiredAverageGossip(long gossip_interval) {        desired_avg_gossip=gossip_interval;    }    public long getMaxBytes() {        return max_bytes;    }    public void setMaxBytes(long max_bytes) {        this.max_bytes=max_bytes;    }    public int getNumberOfGossipMessages() {return num_gossips;}    public void resetStats() {        super.resetStats();        num_gossips=0;    }    public Vector requiredDownServices() {        Vector retval=new Vector();        retval.addElement(new Integer(Event.GET_DIGEST_STABLE));  // NAKACK layer        return retval;    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("digest_timeout");        if(str != null) {            props.remove("digest_timeout");            log.error("digest_timeout has been deprecated; it will be ignored");        }        str=props.getProperty("desired_avg_gossip");        if(str != null) {            desired_avg_gossip=Long.parseLong(str);            props.remove("desired_avg_gossip");        }        str=props.getProperty("stability_delay");        if(str != null) {            stability_delay=Long.parseLong(str);            props.remove("stability_delay");        }        str=props.getProperty("max_gossip_runs");        if(str != null) {            props.remove("max_gossip_runs");            log.error("max_gossip_runs has been deprecated and will be ignored");        }        str=props.getProperty("max_bytes");        if(str != null) {            max_bytes=Long.parseLong(str);            props.remove("max_bytes");        }        str=props.getProperty("max_suspend_time");        if(str != null) {            log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");            props.remove("max_suspend_time");        }        if(props.size() > 0) {            log.error("these properties are not recognized: " + props);                        return false;        }        return true;    }    private void suspend(long timeout) {        if(!suspended) {            suspended=true;            if(log.isDebugEnabled())                log.debug("suspending message garbage collection");        }        startResumeTask(timeout); // will not start task if already running    }    private void resume() {        resetDigest(mbrs); // start from scratch        suspended=false;        if(log.isDebugEnabled())            log.debug("resuming message garbage collection");        stopResumeTask();    }    public void start() throws Exception {        if(stack != null && stack.timer != null)            timer=stack.timer;        else            throw new Exception("timer cannot be retrieved from protocol stack");        if(desired_avg_gossip > 0)            startStableTask();    }    public void stop() {        stopStableTask();        clearDigest();    }    public void up(Event evt) {        Message msg;        StableHeader hdr;        int type=evt.getType();        switch(type) {        case Event.MSG:            msg=(Message)evt.getArg();            // only if message counting is enabled, and only for multicast messages            // fixes http://jira.jboss.com/jira/browse/JGRP-233            if(max_bytes > 0) {                Address dest=msg.getDest();                if(dest == null || dest.isMulticastAddress()) {                    num_bytes_received+=(long)Math.max(msg.getLength(), 24);                    if(num_bytes_received >= max_bytes) {                        if(trace) {                            log.trace(new StringBuffer("max_bytes has been reached (").append(max_bytes).                                    append(", bytes received=").append(num_bytes_received).append("): triggers stable msg"));                        }                        num_bytes_received=0;                        // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)                        passDown(new Event(Event.GET_DIGEST_STABLE));                    }                }            }            hdr=(StableHeader)msg.removeHeader(name);            if(hdr == null)                break;            switch(hdr.type) {            case StableHeader.STABLE_GOSSIP:                handleStableMessage(msg.getSrc(), hdr.stableDigest);                break;            case StableHeader.STABILITY:                handleStabilityMessage(hdr.stableDigest, msg.getSrc());                break;            default:                if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");            }            return;  // don't pass STABLE or STABILITY messages up the stack        case Event.GET_DIGEST_STABLE_OK:            Digest d=(Digest)evt.getArg();            synchronized(latest_local_digest) {                latest_local_digest.replace(d);            }            if(trace)                log.trace("setting latest_local_digest from NAKACK: " + d.printHighSeqnos());            sendStableMessage(d);            break;        case Event.VIEW_CHANGE:            View view=(View)evt.getArg();            handleViewChange(view);            break;        case Event.SET_LOCAL_ADDRESS:            local_addr=(Address)evt.getArg();            break;        }        passUp(evt);    }    public void down(Event evt) {        switch(evt.getType()) {        case Event.VIEW_CHANGE:            View v=(View)evt.getArg();            handleViewChange(v);            break;        case Event.SUSPEND_STABLE:            long timeout=0;            Object t=evt.getArg();            if(t != null && t instanceof Long)                timeout=((Long)t).longValue();            suspend(timeout);            break;        case Event.RESUME_STABLE:            resume();            break;        }        passDown(evt);    }    public void runMessageGarbageCollection() {        Digest copy;        synchronized(digest) {            copy=digest.copy();        }        sendStableMessage(copy);    }    /* --------------------------------------- Private Methods ---------------------------------------- */    private void handleViewChange(View v) {        Vector tmp=v.getMembers();        mbrs.clear();        mbrs.addAll(tmp);        adjustSenders(digest, tmp);        adjustSenders(latest_local_digest, tmp);        resetDigest(tmp);        if(!initialized)            initialized=true;    }    /** Digest and members are guaranteed to be non-null */    private static void adjustSenders(Digest d, Vector members) {        synchronized(d) {            // 1. remove all members from digest who are not in the view            Iterator it=d.senders.keySet().iterator();            Address mbr;            while(it.hasNext()) {                mbr=(Address)it.next();                if(!members.contains(mbr))                    it.remove();            }            // 2. add members to digest which are in the new view but not in the digest            for(int i=0; i < members.size(); i++) {                mbr=(Address)members.get(i);                if(!d.contains(mbr))                    d.add(mbr, -1, -1);            }        }    }    private void clearDigest() {        synchronized(digest) {            digest.clear();        }    }    /** Update my own digest from a digest received by somebody else. Returns whether the update was successful.     *  Needs to be called with a lock on digest */    private boolean updateLocalDigest(Digest d, Address sender) {        if(d == null || d.size() == 0)            return false;        if(!initialized) {            if(trace)                log.trace("STABLE message will not be handled as I'm not yet initialized");            return false;        }        if(!digest.sameSenders(d)) {            if(trace)                log.trace(new StringBuffer("received a digest ").append(d.printHighSeqnos()).append(" from ").                          append(sender).append(" which has different members than mine (").                          append(digest.printHighSeqnos()).append("), discarding it and resetting heard_from list"));            // to avoid sending incorrect stability/stable msgs, we simply reset our heard_from list, see DESIGN            resetDigest(mbrs);            return false;        }        StringBuffer sb=null;        if(trace)            sb=new StringBuffer("my [").append(local_addr).append("] digest before: ").append(digest).                    append("\ndigest from ").append(sender).append(": ").append(d);        Address mbr;        long highest_seqno, my_highest_seqno, new_highest_seqno;        long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;        Map.Entry entry;        org.jgroups.protocols.pbcast.Digest.Entry val;        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            mbr=(Address)entry.getKey();            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();            highest_seqno=val.high_seqno;            highest_seen_seqno=val.high_seqno_seen;            // compute the minimum of the highest seqnos deliverable (for garbage collection)            my_highest_seqno=digest.highSeqnoAt(mbr);            // compute the maximum of the highest seqnos seen (for retransmission of last missing message)            my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);            new_highest_seqno=Math.min(my_highest_seqno, highest_seqno);            new_highest_seen_seqno=Math.max(my_highest_seen_seqno, highest_seen_seqno);            digest.setHighestDeliveredAndSeenSeqnos(mbr, new_highest_seqno, new_highest_seen_seqno);        }        if(trace) {            sb.append("\nmy [").append(local_addr).append("] digest after: ").append(digest).append("\n");            log.trace(sb);        }        return true;    }    private void resetDigest(Vector new_members) {        if(new_members == null || new_members.size() == 0)            return;        synchronized(heard_from) {            heard_from.clear();            heard_from.addAll(new_members);        }        Digest copy_of_latest;        synchronized(latest_local_digest) {            copy_of_latest=latest_local_digest.copy();        }        synchronized(digest) {            digest.replace(copy_of_latest);            if(trace)                log.trace("resetting digest from NAKACK: " + copy_of_latest.printHighSeqnos());        }    }    /**     * Removes mbr from heard_from and returns true if this was the last member, otherwise false.     * Resets the heard_from list (populates with membership)     * @param mbr     */    private boolean removeFromHeardFromList(Address mbr) {        synchronized(heard_from) {            heard_from.remove(mbr);            if(heard_from.size() == 0) {                resetDigest(this.mbrs);                return true;            }        }        return false;    }    void startStableTask() {        // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case        // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss        // 1 cycle: on the next message or view, we will start the task        if(stable_task != null)            return;        synchronized(stable_task_mutex) {            if(stable_task != null && stable_task.running()) {                return;  // already running            }            stable_task=new StableTask();            timer.add(stable_task, true); // fixed-rate scheduling        }        if(trace)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -