📄 pbcast.java
字号:
if(missing_msgs != null) { if(log.isInfoEnabled()) log.info("asking " + gossip.sender + " for retransmission of " + sender + ", missing messages: " + missing_msgs + "\nwin for " + sender + ":\n" + win + '\n'); if(ht == null) ht=new Hashtable(); ht.put(sender, missing_msgs); } } } } /* 6. Send a XMIT_REQ to the sender of the gossip. The sender will then resend those messages as an XMIT_RSP unicast message (the messages are in its buffer, as a List) */ if(ht == null || ht.size() == 0) { } else { hdr=new PbcastHeader(PbcastHeader.XMIT_REQ); hdr.xmit_reqs=ht; if(log.isInfoEnabled()) log.info("sending XMIT_REQ to " + gossip.sender); msg=new Message(gossip.sender, null, null); msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, msg)); } /* 7. Remove myself from 'not_seen' list. If not_seen list is empty, we can garbage-collect messages smaller than the digest. Since all the members have seen the gossip, it will not be re-sent */ gossip.removeFromNotSeenList(local_addr); if(gossip.sizeOfNotSeenList() == 0) { garbageCollect(gossip.digest); return; } /* 8. If we make it to this point, re-send to subset of remaining members in 'not_seen' list */ new_dests=Util.pickSubset(gossip.getNotSeenList(), subset); if(log.isInfoEnabled()) log.info("(from " + local_addr + ") forwarding gossip " + gossip.shortForm() + " to " + new_dests); gossip.addToSeenList(local_addr); for(int i=0; i < new_dests.size(); i++) { dest=(Address) new_dests.elementAt(i); msg=new Message(dest, null, null); hdr=new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP); msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, msg)); } } /** * Find the messages indicated in <code>xmit_reqs</code> and re-send them to * <code>requester</code> */ void handleXmitRequest(Address requester, Hashtable xmit_reqs) { NakReceiverWindow win; Address sender; List msgs, missing_msgs, xmit_msgs; Message msg; if(requester == null) { if(log.isErrorEnabled()) log.error("requester is null"); return; } if(log.isInfoEnabled()) log.info("retransmission requests are " + printXmitReqs(xmit_reqs)); for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) { sender=(Address) e.nextElement(); win=(NakReceiverWindow) digest.get(sender); if(win == null) { if(warn) log.warn("sender " + sender + " not found in my digest; skipping retransmit request !"); continue; } missing_msgs=(List) xmit_reqs.get(sender); msgs=win.getMessagesInList(missing_msgs); // msgs to be sent back to requester // re-send the messages to requester. don't add a header since they already have headers // (when added to the NakReceiverWindow, the headers were not removed) xmit_msgs=new List(); for(Enumeration en=msgs.elements(); en.hasMoreElements();) { msg=((Message) en.nextElement()).copy(); xmit_msgs.add(msg); } // create a msg with the List of xmit_msgs as contents, add header msg=new Message(requester, null, xmit_msgs); msg.putHeader(getName(), new PbcastHeader(PbcastHeader.XMIT_RSP)); passDown(new Event(Event.MSG, msg)); } } void handleXmitRsp(List xmit_msgs) { Message m; PbcastHeader hdr; for(Enumeration e=xmit_msgs.elements(); e.hasMoreElements();) { m=(Message) e.nextElement(); hdr=(PbcastHeader) m.removeHeader(getName()); if(hdr == null) { log.warn("header is null, ignoring message"); } else { if(log.isInfoEnabled()) log.info("received #" + hdr.seqno + ", type=" + PbcastHeader.type2String(hdr.type) + ", msg=" + m); handleUpMessage(m, hdr); } } } String printXmitReqs(Hashtable xmit_reqs) { StringBuffer sb=new StringBuffer(); Address key; boolean first=true; if(xmit_reqs == null) return "<null>"; for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) { key=(Address) e.nextElement(); if(!first) { sb.append(", "); } else first=false; sb.append(key + ": " + xmit_reqs.get(key)); } return sb.toString(); } void garbageCollect(Digest d) { Address sender; long tmp_seqno; NakReceiverWindow win; Map.Entry entry; org.jgroups.protocols.pbcast.Digest.Entry val; for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sender=(Address)entry.getKey(); val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); win=(NakReceiverWindow)digest.get(sender); if(win == null) { if(log.isDebugEnabled()) log.debug("sender " + sender + " not found in our message digest, skipping"); continue; } tmp_seqno=val.high_seqno; tmp_seqno=Math.max(tmp_seqno - gc_lag, 0); if(tmp_seqno <= 0) { continue; } if(trace) log.trace("(from " + local_addr + ") GC: deleting messages < " + tmp_seqno + " from " + sender); win.stable(tmp_seqno); } } /** * If sender of gossip is not a member, send a NOT_MEMBER to sender (after n gossips received). * This will cause that member to leave the group and possibly re-join. */ void shunInvalidGossiper(Address invalid_gossiper) { int num_pings=0; Message shun_msg; if(invalid_gossipers.containsKey(invalid_gossiper)) { num_pings=((Integer) invalid_gossipers.get(invalid_gossiper)).intValue(); if(num_pings >= max_invalid_gossips) { if(log.isInfoEnabled()) log.info("sender " + invalid_gossiper + " is not member of " + members + " ! Telling it to leave group"); shun_msg=new Message(invalid_gossiper, null, null); shun_msg.putHeader(getName(), new PbcastHeader(PbcastHeader.NOT_MEMBER)); passDown(new Event(Event.MSG, shun_msg)); invalid_gossipers.remove(invalid_gossiper); } else { num_pings++; invalid_gossipers.put(invalid_gossiper, new Integer(num_pings)); } } else { num_pings++; invalid_gossipers.put(invalid_gossiper, new Integer(num_pings)); } } /** Computes the gossip_interval. See DESIGN for details */ long computeGossipInterval(int num_mbrs, double desired_avg_gossip) { return getRandom((long) (num_mbrs * desired_avg_gossip * 2)); } long getRandom(long range) { return (long) ((Math.random() * range) % range); } /* ------------------------------- End of Private Methods ---------------------------------------- */ private static class GossipEntry { PbcastHeader hdr=null; Address sender=null; byte[] data=null; GossipEntry(PbcastHeader hdr, Address sender, byte[] data) { this.hdr=hdr; this.sender=sender; this.data=data; } public String toString() { return "hdr=" + hdr + ", sender=" + sender + ", data=" + data; } } /** Handles gossip and retransmission requests. Removes requests from a (bounded) queue. */ private class GossipHandler implements Runnable { Thread t=null; final Queue queue; GossipHandler(Queue q) { queue=q; } void start() { if(t == null) { t=new Thread(this, "PBCAST.GossipHandlerThread"); t.setDaemon(true); t.start(); } } void stop() { Thread tmp; if(t != null && t.isAlive()) { tmp=t; t=null; if(queue != null) queue.close(false); // don't flush elements tmp.interrupt(); } t=null; } public void run() { GossipEntry entry; PbcastHeader hdr; List xmit_msgs; byte[] data; while(t != null && queue != null) { try { entry=(GossipEntry) queue.remove(); hdr=entry.hdr; if(hdr == null) { if(log.isErrorEnabled()) log.error("gossip entry has no PbcastHeader"); continue; } switch(hdr.type) { case PbcastHeader.GOSSIP: handleGossip(hdr.gossip); break; case PbcastHeader.XMIT_REQ: if(hdr.xmit_reqs == null) { if(warn) log.warn("request is null !"); break; } handleXmitRequest(entry.sender, hdr.xmit_reqs); break; case PbcastHeader.XMIT_RSP: data=entry.data; if(data == null) { if(warn) log.warn("buffer is null (no xmitted msgs)"); break; } try { xmit_msgs=(List) Util.objectFromByteBuffer(data); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("failed creating retransmitted messages from buffer", ex); break; } handleXmitRsp(xmit_msgs); break; case PbcastHeader.NOT_MEMBER: // we are shunned if(shun) { if(log.isInfoEnabled()) log.info("I am being shunned. Will leave and re-join"); passUp(new Event(Event.EXIT)); } break; default: if(log.isErrorEnabled()) log.error("type (" + hdr.type + ") of PbcastHeader not known !"); return; } } catch(QueueClosedException closed) { break; } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -