📄 pbcast.java
字号:
/* --------------------------------- Private Methods --------------------------------------------- */ /** Ensures that FIFO is observed for all messages for a certain member. The NakReceiverWindow corresponding to a certain sender is looked up in a hashtable. Then, the message is added to the NakReceiverWindow. As many messages as possible are then removed from the table and passed up. */ void handleUpMessage(Message m, PbcastHeader hdr) { Address sender=m.getSrc(); NakReceiverWindow win=null; Message tmpmsg; long tmp_seqno=hdr.seqno; if(sender == null) { if(log.isErrorEnabled()) log.error("sender is null"); return; } synchronized(digest) { win=(NakReceiverWindow) digest.get(sender); if(win == null) { if(warn) log.warn("NakReceiverWindow for sender " + sender + " not found. Creating new NakReceiverWindow starting at seqno=" + tmp_seqno); win=new NakReceiverWindow(sender, tmp_seqno); digest.put(sender, win); } // ************************************* // The header was removed before, so we add it again for the NakReceiverWindow. When there is a // retransmission request, the header will already be attached to the message (both message and // header are *copied* into delivered_msgs when a message is removed from NakReceiverWindow). // ************************************* m.putHeader(getName(), hdr); win.add(tmp_seqno, m); if(log.isInfoEnabled()) log.info("receiver window for " + sender + " is " + win); // Try to remove as many message as possible and send them up the stack while((tmpmsg=win.remove()) != null) { tmpmsg.removeHeader(getName()); // need to remove header again, so upper protocols don't get confused passUp(new Event(Event.MSG, tmpmsg)); } // Garbage collect messages if singleton member (because then we won't receive any gossips, triggering // garbage collection) if(members.size() == 1) { tmp_seqno=Math.max(tmp_seqno - gc_lag, 0); if(tmp_seqno <= 0) { } else { if(trace) log.trace("deleting messages < " + tmp_seqno + " from " + sender); win.stable(tmp_seqno); } } } } /** * Returns for each sender the 'highest seen' seqno from the digest. Highest seen means the * highest seqno without any gaps, e.g. if for a sender P the messages 2 3 4 6 7 were received, * then only 2, 3 and 4 can be delivered, so 4 is the highest seen. 6 and 7 cannot because there * 5 is missing. If there are no message, the highest seen seqno is -1. */ Digest getDigest() { Digest ret=new Digest(digest.size()); long highest_seqno, lowest_seqno; Address key; NakReceiverWindow win; for(Enumeration e=digest.keys(); e.hasMoreElements();) { key=(Address) e.nextElement(); win=(NakReceiverWindow) digest.get(key); lowest_seqno=win.getLowestSeen(); highest_seqno=win.getHighestSeen(); ret.add(key, lowest_seqno, highest_seqno); } if(log.isInfoEnabled()) log.info("digest is " + ret); return ret; } /** * Sets (or resets) the contents of the 'digest' table. Its current messages will be deleted and the * NakReceiverTables reset. */ void setDigest(Digest d) { NakReceiverWindow win; long tmp_seqno=1; synchronized(digest) { for(Enumeration e=digest.elements(); e.hasMoreElements();) { win=(NakReceiverWindow) e.nextElement(); win.reset(); } digest.clear(); Map.Entry entry; Address sender; org.jgroups.protocols.pbcast.Digest.Entry val; for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sender=(Address)entry.getKey(); if(sender == null) { if(log.isErrorEnabled()) log.error("cannot set item because sender is null"); continue; } val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); tmp_seqno=val.high_seqno; digest.put(sender, new NakReceiverWindow(sender, tmp_seqno + 1)); // next to expect, digest had *last* seen ! } } } String printDigest() { long highest_seqno; Address key; NakReceiverWindow win; StringBuffer sb=new StringBuffer(); for(Enumeration e=digest.keys(); e.hasMoreElements();) { key=(Address) e.nextElement(); win=(NakReceiverWindow) digest.get(key); highest_seqno=win.getHighestSeen(); sb.append(key + ": " + highest_seqno + '\n'); } return sb.toString(); } String printIncomingMessageQueue() { StringBuffer sb=new StringBuffer(); NakReceiverWindow win; win=(NakReceiverWindow) digest.get(local_addr); sb.append(win); return sb.toString(); } void startGossipThread() { if(gossip_thread == null) { gossip_thread=new Thread(this); gossip_thread.setDaemon(true); gossip_thread.start(); } } void stopGossipThread() { Thread tmp; if(gossip_thread != null) { if(gossip_thread.isAlive()) { tmp=gossip_thread; gossip_thread=null; tmp.interrupt(); tmp=null; } } gossip_thread=null; } void startGossipHandler() { if(gossip_handler == null) { gossip_handler=new GossipHandler(gossip_queue); gossip_handler.start(); } } void stopGossipHandler() { if(gossip_handler != null) { gossip_handler.stop(); gossip_handler=null; } } /** * Send a gossip message with a message digest of the highest seqnos seen per sender to a subset * of the current membership. Exclude self (I receive all mcasts sent by myself). */ void sendGossip() { Vector current_mbrs=(Vector) members.clone(); Vector subset_mbrs=null; Gossip gossip=null; Message msg; Address dest; PbcastHeader hdr; if(local_addr != null) current_mbrs.remove(local_addr); // don't pick myself if(mcast_gossip) { // send gossip to all members using a multicast gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), null); // not_seen list is null, prevents forwarding for(int i=0; i < current_mbrs.size(); i++) // all members have seen this gossip. Used for garbage collection gossip.addToSeenList((Address) current_mbrs.elementAt(i)); hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP); msg=new Message(); // null dest == multicast to all members msg.putHeader(getName(), hdr); if(log.isInfoEnabled()) log.info("(from " + local_addr + ") multicasting gossip " + gossip.shortForm() + " to all members"); passDown(new Event(Event.MSG, msg)); } else { subset_mbrs=Util.pickSubset(current_mbrs, subset); for(int i=0; i < subset_mbrs.size(); i++) { gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), (Vector) current_mbrs.clone()); gossip.addToSeenList(local_addr); hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP); dest=(Address) subset_mbrs.elementAt(i); msg=new Message(dest); msg.putHeader(getName(), hdr); if(log.isInfoEnabled()) log.info("(from " + local_addr + ") sending gossip " + gossip.shortForm() + " to " + subset_mbrs); passDown(new Event(Event.MSG, msg)); } } gossip_round++; } /** * MOST IMPORTANT METHOD IN THIS CLASS !! This guy really decides how a gossip reaches all members, * or whether it will flood the network !<p> * Scrutinize the gossip received and request retransmission of messages that we haven't received yet. * A gossip has a digest which carries for each sender the lowest and highest seqno seen. We check * this range against our own digest and request retransmission of missing messages if needed.<br> * <em>See DESIGN for a description of this method</em> */ void handleGossip(Gossip gossip) { long my_low=0, my_high=0, their_low, their_high; Hashtable ht=null; Digest their_digest; NakReceiverWindow win; Message msg; Address dest; Vector new_dests; PbcastHeader hdr; List missing_msgs; // list of missing messages (for retransmission) (List of Longs) if(trace) log.trace("(from " + local_addr + ") received gossip " + gossip.shortForm() + " from " + gossip.sender); if(gossip == null || gossip.digest == null) { if(warn) log.warn("gossip is null or digest is null"); return; } /* 1. If gossip sender is null, we cannot ask it for missing messages anyway, so discard gossip ! */ if(gossip.sender == null) { if(log.isErrorEnabled()) log.error("sender of gossip is null; " + "don't know where to send XMIT_REQ to. Discarding gossip"); return; } /* 2. Don't process the gossip if the sender of the gossip is not a member anymore. If it is a newly joined member, discard it as well (we can't tell the difference). When the new member will be added to the membership, then its gossips will be processed */ if(!members.contains(gossip.sender)) { if(warn) log.warn("sender " + gossip.sender + " is not a member. Gossip will not be processed"); if(shun) shunInvalidGossiper(gossip.sender); return; } /* 3. If this gossip was received before, just discard it and return (don't process the same gossip twice). This prevents flooding of the gossip sender with retransmission reqs */ while(gossip_list.size() >= max_gossip_cache) // first delete oldest gossips gossip_list.removeFromHead(); if(gossip_list.contains(gossip)) // already received, don't re-broadcast return; else gossip_list.add(gossip.copy()); // add to list of received gossips /* 4. Send a HEARD_FROM event containing all members in the gossip-chain down to the FD layer. This ensures that we don't suspect them */ seen_list=gossip.getSeenList(); if(seen_list.size() > 0) passDown(new Event(Event.HEARD_FROM, seen_list.clone())); /* 5. Compare their digest against ours. Find out if some messages in the their digest are not in our digest. If yes, put them in the 'ht' hashtable for retransmission */ their_digest=gossip.digest; Map.Entry entry; Address sender; org.jgroups.protocols.pbcast.Digest.Entry val; for(Iterator it=their_digest.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sender=(Address)entry.getKey(); val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); their_low=val.low_seqno; their_high=val.high_seqno; if(their_low == 0 && their_high == 0) continue; // won't have any messages for this sender, don't even re-send win=(NakReceiverWindow) digest.get(sender); if(win == null) { // this specific sender in this digest is probably not a member anymore, new digests // won't contain it. for now, just ignore it. if it is a new member, it will be in the next // gossips if(warn) log.warn("sender " + sender + " not found, skipping..."); continue; } my_low=win.getLowestSeen(); my_high=win.getHighestSeen(); if(my_high < their_high) { // changed by Bela (June 26 2003) - replaced my_high with my_low (not tested though !) if(my_low + 1 < their_low) { } else { missing_msgs=win.getMissingMessages(my_high, their_high);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -