⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pbcast.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
// $Id: PBCAST.java,v 1.16 2006/04/23 12:52:53 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.NakReceiverWindow;import org.jgroups.stack.Protocol;import org.jgroups.util.List;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import java.util.*;/** * Implementation of probabilistic broadcast. Sends group messages via unreliable multicast. Gossips regularly to * a random subset of group members to retransmit missing messages. Gossiping is used both for bringing all * members to the same state (having received the same messages) and to garbage-collect messages seen by all members * (gc is piggybacked in gossip messages). See DESIGN for more details. * @author Bela Ban */public class PBCAST extends Protocol implements Runnable {    boolean operational=false;    long seqno=1;                  // seqno for messages. 1 for the first message    long gossip_round=1;           // identifies the gossip (together with sender)    Address local_addr=null;    final Hashtable digest=new Hashtable();   // stores all messages from members (key: member, val: NakReceiverWindow)    Thread gossip_thread=null;    GossipHandler gossip_handler=null;      // removes gossips and other requests from queue and handles them    final Queue gossip_queue=new Queue(); // (bounded) queue for incoming gossip requests    int max_queue=100;            // max elements in gossip_queue (bounded buffer)    long gossip_interval=5000;     // gossip every 5 seconds    double subset=0.1;               // send gossip messages to a subset consisting of 10% of the mbrship    long desired_avg_gossip=30000; // receive a gossip every 30 secs on average    final Vector members=new Vector();    final List gossip_list=new List();   // list of gossips received, we periodically purge it (FIFO)    int max_gossip_cache=100;     // number of gossips to keep until gossip list is purged    int gc_lag=30;                // how many seqnos should we lag behind (see DESIGN)    final Hashtable invalid_gossipers=new Hashtable(); // keys=Address, val=Integer (number of gossips from suspected mbrs)    final int max_invalid_gossips=2;    // max number of gossip from non-member before that member is shunned    Vector seen_list=null;    boolean shun=false;               // whether invalid gossipers will be shunned or not    boolean dynamic=true;             // whether to use dynamic or static gosssip_interval (overrides gossip_interval)    boolean skip_sleep=true;    boolean mcast_gossip=true;        // use multicast for gossips (subset will be ignored, send to all members)    public String getName() {        return "PBCAST";    }    public Vector providedUpServices() {        Vector retval=new Vector();        retval.addElement(new Integer(Event.GET_DIGEST));        retval.addElement(new Integer(Event.SET_DIGEST));        retval.addElement(new Integer(Event.GET_DIGEST_STATE));        return retval;    }    public void stop() {        stopGossipThread();        stopGossipHandler();        operational=false;    }    public void up(Event evt) {        Message m;        PbcastHeader hdr;        Address sender=null;        switch(evt.getType()) {            case Event.MSG:                m=(Message) evt.getArg();                if(m.getDest() != null && !m.getDest().isMulticastAddress()) {                    if(!(m.getHeader(getName()) instanceof PbcastHeader))                        break; // unicast address: not null and not mcast, pass up unchanged                }                // discard all multicast messages until we become operational (transition from joiner to member)                if(!operational) {                        if(log.isInfoEnabled()) log.info("event was discarded as I'm not yet operational. Event: " +                                                  Util.printEvent(evt));                    return;  // don't pass up                }                if(m.getHeader(getName()) instanceof PbcastHeader)                    hdr=(PbcastHeader) m.removeHeader(getName());                else {                    sender=m.getSrc();                        if(log.isErrorEnabled()) log.error("PbcastHeader expected, but received header of type " +                                                   m.getHeader(getName()).getClass().getName() + " from " + sender +                                                   ". Passing event up unchanged");                    break;                }                switch(hdr.type) {                    case PbcastHeader.MCAST_MSG:  // messages are handled directly (high priority)                        handleUpMessage(m, hdr);                        return;                        // all other requests are put in the bounded gossip queue (discarded if full). this helps to ensure                        // that no 'gossip storms' will occur (overflowing the buffers and the network)                    case PbcastHeader.GOSSIP:                    case PbcastHeader.XMIT_REQ:                    case PbcastHeader.XMIT_RSP:                    case PbcastHeader.NOT_MEMBER:                        try {                            if(gossip_queue.size() >= max_queue) {                                    if(warn) log.warn("gossip request " +                                                              PbcastHeader.type2String(hdr.type) + " discarded because " +                                                              "gossip_queue is full (number of elements=" + gossip_queue.size() + ')');                                return;                            }                            gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m.getBuffer()));                        }                        catch(Exception ex) {                            if(warn) log.warn("exception adding request to gossip_queue, details=" + ex);                        }                        return;                    default:                        if(log.isErrorEnabled()) log.error("type (" + hdr.type + ") of PbcastHeader not known !");                        return;                }            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address) evt.getArg();                break;  // pass up        }        passUp(evt);  // pass up by default    }    public void down(Event evt) {        PbcastHeader hdr;        Message m, copy;        View v;        Vector mbrs;        Address key;        NakReceiverWindow win;        switch(evt.getType()) {            case Event.MSG:                m=(Message) evt.getArg();                if(m.getDest() != null && !m.getDest().isMulticastAddress()) {                    break; // unicast address: not null and not mcast, pass down unchanged                }                else {      // multicast address                    hdr=new PbcastHeader(PbcastHeader.MCAST_MSG, seqno);                    m.putHeader(getName(), hdr);                    // put message in NakReceiverWindow (to be on the safe side if we don't receive it ...)                    synchronized(digest) {                        win=(NakReceiverWindow) digest.get(local_addr);                        if(win == null) {                            if(log.isInfoEnabled()) log.info("NakReceiverWindow for sender " + local_addr +                                                        " not found. Creating new NakReceiverWindow starting at seqno=" + seqno);                            win=new NakReceiverWindow(local_addr, seqno);                            digest.put(local_addr, win);                        }                        copy=m.copy();                        copy.setSrc(local_addr);                        win.add(seqno, copy);                    }                    seqno++;                    break;                }            case Event.SET_DIGEST:                setDigest((Digest) evt.getArg());                return;  // don't pass down            case Event.GET_DIGEST:  // don't pass down                passUp(new Event(Event.GET_DIGEST_OK, getDigest()));                return;            case Event.GET_DIGEST_STATE:  // don't pass down                passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest()));                return;            case Event.VIEW_CHANGE:                v=(View) evt.getArg();                if(v == null) {                    if(log.isErrorEnabled()) log.error("view is null !");                    break;                }                mbrs=v.getMembers();                // update internal membership list                synchronized(members) {                    members.removeAllElements();                    for(int i=0; i < mbrs.size(); i++)                        members.addElement(mbrs.elementAt(i));                }                // delete all members in digest that are not in new membership list                if(mbrs.size() > 0) {                    synchronized(digest) {                        for(Enumeration e=digest.keys(); e.hasMoreElements();) {                            key=(Address) e.nextElement();                            if(!mbrs.contains(key)) {                                win=(NakReceiverWindow) digest.get(key);                                win.reset();                                digest.remove(key);                            }                        }                    }                }                // add all members from new membership list that are not yet in digest                for(int i=0; i < mbrs.size(); i++) {                    key=(Address) mbrs.elementAt(i);                    if(!digest.containsKey(key)) {                        digest.put(key, new NakReceiverWindow(key, 1));                    }                }                if(dynamic) {                    gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip);                        if(log.isInfoEnabled()) log.info("VIEW_CHANGE: gossip_interval=" + gossip_interval);                    if(gossip_thread != null) {                        skip_sleep=true;                        gossip_thread.interrupt(); // wake up and sleep according to the new gossip_interval                    }                }                startGossipThread();  // will only be started if not yet running                startGossipHandler();                break;            case Event.BECOME_SERVER:                operational=true;                break;        }        passDown(evt);    }    /** Gossip thread. Sends gossips containing a message digest every <code>gossip_interval</code> msecs */    public void run() {        while(gossip_thread != null) {  // stopGossipThread() sets gossip_thread to null            if(dynamic) {                gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip);                    if(log.isInfoEnabled()) log.info("gossip_interval=" + gossip_interval);            }            Util.sleep(gossip_interval);            if(skip_sleep)                skip_sleep=false;            else                sendGossip();        }    }    /** Setup the Protocol instance acording to the configuration string */    public boolean setProperties(Properties props) {super.setProperties(props);        String str;        str=props.getProperty("dynamic");        if(str != null) {            dynamic=Boolean.valueOf(str).booleanValue();            props.remove("dynamic");        }        str=props.getProperty("shun");        if(str != null) {            shun=Boolean.valueOf(str).booleanValue();            props.remove("shun");        }        str=props.getProperty("gossip_interval");        if(str != null) {            gossip_interval=Long.parseLong(str);            props.remove("gossip_interval");        }        str=props.getProperty("mcast_gossip");        if(str != null) {            mcast_gossip=Boolean.valueOf(str).booleanValue();            props.remove("mcast_gossip");        }        str=props.getProperty("subset");        if(str != null) {            subset=Double.parseDouble(str);            props.remove("subset");        }        str=props.getProperty("desired_avg_gossip");        if(str != null) {            desired_avg_gossip=Long.parseLong(str);            props.remove("desired_avg_gossip");        }        str=props.getProperty("max_queue");        if(str != null) {            max_queue=Integer.parseInt(str);            props.remove("max_queue");        }        str=props.getProperty("max_gossip_cache");        if(str != null) {            max_gossip_cache=Integer.parseInt(str);            props.remove("max_gossip_cache");        }        str=props.getProperty("gc_lag");        if(str != null) {            gc_lag=Integer.parseInt(str);            props.remove("gc_lag");        }        if(props.size() > 0) {            log.error("PBCAST.setProperties(): the following properties are not recognized: " + props);                        return false;        }        return true;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -