📄 pbcast.java
字号:
// $Id: PBCAST.java,v 1.16 2006/04/23 12:52:53 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.NakReceiverWindow;import org.jgroups.stack.Protocol;import org.jgroups.util.List;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import java.util.*;/** * Implementation of probabilistic broadcast. Sends group messages via unreliable multicast. Gossips regularly to * a random subset of group members to retransmit missing messages. Gossiping is used both for bringing all * members to the same state (having received the same messages) and to garbage-collect messages seen by all members * (gc is piggybacked in gossip messages). See DESIGN for more details. * @author Bela Ban */public class PBCAST extends Protocol implements Runnable { boolean operational=false; long seqno=1; // seqno for messages. 1 for the first message long gossip_round=1; // identifies the gossip (together with sender) Address local_addr=null; final Hashtable digest=new Hashtable(); // stores all messages from members (key: member, val: NakReceiverWindow) Thread gossip_thread=null; GossipHandler gossip_handler=null; // removes gossips and other requests from queue and handles them final Queue gossip_queue=new Queue(); // (bounded) queue for incoming gossip requests int max_queue=100; // max elements in gossip_queue (bounded buffer) long gossip_interval=5000; // gossip every 5 seconds double subset=0.1; // send gossip messages to a subset consisting of 10% of the mbrship long desired_avg_gossip=30000; // receive a gossip every 30 secs on average final Vector members=new Vector(); final List gossip_list=new List(); // list of gossips received, we periodically purge it (FIFO) int max_gossip_cache=100; // number of gossips to keep until gossip list is purged int gc_lag=30; // how many seqnos should we lag behind (see DESIGN) final Hashtable invalid_gossipers=new Hashtable(); // keys=Address, val=Integer (number of gossips from suspected mbrs) final int max_invalid_gossips=2; // max number of gossip from non-member before that member is shunned Vector seen_list=null; boolean shun=false; // whether invalid gossipers will be shunned or not boolean dynamic=true; // whether to use dynamic or static gosssip_interval (overrides gossip_interval) boolean skip_sleep=true; boolean mcast_gossip=true; // use multicast for gossips (subset will be ignored, send to all members) public String getName() { return "PBCAST"; } public Vector providedUpServices() { Vector retval=new Vector(); retval.addElement(new Integer(Event.GET_DIGEST)); retval.addElement(new Integer(Event.SET_DIGEST)); retval.addElement(new Integer(Event.GET_DIGEST_STATE)); return retval; } public void stop() { stopGossipThread(); stopGossipHandler(); operational=false; } public void up(Event evt) { Message m; PbcastHeader hdr; Address sender=null; switch(evt.getType()) { case Event.MSG: m=(Message) evt.getArg(); if(m.getDest() != null && !m.getDest().isMulticastAddress()) { if(!(m.getHeader(getName()) instanceof PbcastHeader)) break; // unicast address: not null and not mcast, pass up unchanged } // discard all multicast messages until we become operational (transition from joiner to member) if(!operational) { if(log.isInfoEnabled()) log.info("event was discarded as I'm not yet operational. Event: " + Util.printEvent(evt)); return; // don't pass up } if(m.getHeader(getName()) instanceof PbcastHeader) hdr=(PbcastHeader) m.removeHeader(getName()); else { sender=m.getSrc(); if(log.isErrorEnabled()) log.error("PbcastHeader expected, but received header of type " + m.getHeader(getName()).getClass().getName() + " from " + sender + ". Passing event up unchanged"); break; } switch(hdr.type) { case PbcastHeader.MCAST_MSG: // messages are handled directly (high priority) handleUpMessage(m, hdr); return; // all other requests are put in the bounded gossip queue (discarded if full). this helps to ensure // that no 'gossip storms' will occur (overflowing the buffers and the network) case PbcastHeader.GOSSIP: case PbcastHeader.XMIT_REQ: case PbcastHeader.XMIT_RSP: case PbcastHeader.NOT_MEMBER: try { if(gossip_queue.size() >= max_queue) { if(warn) log.warn("gossip request " + PbcastHeader.type2String(hdr.type) + " discarded because " + "gossip_queue is full (number of elements=" + gossip_queue.size() + ')'); return; } gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m.getBuffer())); } catch(Exception ex) { if(warn) log.warn("exception adding request to gossip_queue, details=" + ex); } return; default: if(log.isErrorEnabled()) log.error("type (" + hdr.type + ") of PbcastHeader not known !"); return; } case Event.SET_LOCAL_ADDRESS: local_addr=(Address) evt.getArg(); break; // pass up } passUp(evt); // pass up by default } public void down(Event evt) { PbcastHeader hdr; Message m, copy; View v; Vector mbrs; Address key; NakReceiverWindow win; switch(evt.getType()) { case Event.MSG: m=(Message) evt.getArg(); if(m.getDest() != null && !m.getDest().isMulticastAddress()) { break; // unicast address: not null and not mcast, pass down unchanged } else { // multicast address hdr=new PbcastHeader(PbcastHeader.MCAST_MSG, seqno); m.putHeader(getName(), hdr); // put message in NakReceiverWindow (to be on the safe side if we don't receive it ...) synchronized(digest) { win=(NakReceiverWindow) digest.get(local_addr); if(win == null) { if(log.isInfoEnabled()) log.info("NakReceiverWindow for sender " + local_addr + " not found. Creating new NakReceiverWindow starting at seqno=" + seqno); win=new NakReceiverWindow(local_addr, seqno); digest.put(local_addr, win); } copy=m.copy(); copy.setSrc(local_addr); win.add(seqno, copy); } seqno++; break; } case Event.SET_DIGEST: setDigest((Digest) evt.getArg()); return; // don't pass down case Event.GET_DIGEST: // don't pass down passUp(new Event(Event.GET_DIGEST_OK, getDigest())); return; case Event.GET_DIGEST_STATE: // don't pass down passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest())); return; case Event.VIEW_CHANGE: v=(View) evt.getArg(); if(v == null) { if(log.isErrorEnabled()) log.error("view is null !"); break; } mbrs=v.getMembers(); // update internal membership list synchronized(members) { members.removeAllElements(); for(int i=0; i < mbrs.size(); i++) members.addElement(mbrs.elementAt(i)); } // delete all members in digest that are not in new membership list if(mbrs.size() > 0) { synchronized(digest) { for(Enumeration e=digest.keys(); e.hasMoreElements();) { key=(Address) e.nextElement(); if(!mbrs.contains(key)) { win=(NakReceiverWindow) digest.get(key); win.reset(); digest.remove(key); } } } } // add all members from new membership list that are not yet in digest for(int i=0; i < mbrs.size(); i++) { key=(Address) mbrs.elementAt(i); if(!digest.containsKey(key)) { digest.put(key, new NakReceiverWindow(key, 1)); } } if(dynamic) { gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip); if(log.isInfoEnabled()) log.info("VIEW_CHANGE: gossip_interval=" + gossip_interval); if(gossip_thread != null) { skip_sleep=true; gossip_thread.interrupt(); // wake up and sleep according to the new gossip_interval } } startGossipThread(); // will only be started if not yet running startGossipHandler(); break; case Event.BECOME_SERVER: operational=true; break; } passDown(evt); } /** Gossip thread. Sends gossips containing a message digest every <code>gossip_interval</code> msecs */ public void run() { while(gossip_thread != null) { // stopGossipThread() sets gossip_thread to null if(dynamic) { gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip); if(log.isInfoEnabled()) log.info("gossip_interval=" + gossip_interval); } Util.sleep(gossip_interval); if(skip_sleep) skip_sleep=false; else sendGossip(); } } /** Setup the Protocol instance acording to the configuration string */ public boolean setProperties(Properties props) {super.setProperties(props); String str; str=props.getProperty("dynamic"); if(str != null) { dynamic=Boolean.valueOf(str).booleanValue(); props.remove("dynamic"); } str=props.getProperty("shun"); if(str != null) { shun=Boolean.valueOf(str).booleanValue(); props.remove("shun"); } str=props.getProperty("gossip_interval"); if(str != null) { gossip_interval=Long.parseLong(str); props.remove("gossip_interval"); } str=props.getProperty("mcast_gossip"); if(str != null) { mcast_gossip=Boolean.valueOf(str).booleanValue(); props.remove("mcast_gossip"); } str=props.getProperty("subset"); if(str != null) { subset=Double.parseDouble(str); props.remove("subset"); } str=props.getProperty("desired_avg_gossip"); if(str != null) { desired_avg_gossip=Long.parseLong(str); props.remove("desired_avg_gossip"); } str=props.getProperty("max_queue"); if(str != null) { max_queue=Integer.parseInt(str); props.remove("max_queue"); } str=props.getProperty("max_gossip_cache"); if(str != null) { max_gossip_cache=Integer.parseInt(str); props.remove("max_gossip_cache"); } str=props.getProperty("gc_lag"); if(str != null) { gc_lag=Integer.parseInt(str); props.remove("gc_lag"); } if(props.size() > 0) { log.error("PBCAST.setProperties(): the following properties are not recognized: " + props); return false; } return true; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -