📄 nakack.java
字号:
win.setDiscardDeliveredMessages(discard_delivered_msgs); win.setMaxXmitBufSize(this.max_xmit_buf_size); if(stats) win.setListener(this); return win; } /** * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far. * * @param sender The address for which the highest and lowest seqnos are to be retrieved * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno * *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8, * whereas the higheset_seqno_seen (deliverable) is 5. */ private Range getLowestAndHighestSeqno(Address sender, boolean stop_at_gaps) { Range r=null; NakReceiverWindow win; if(sender == null) { if(log.isErrorEnabled()) { log.error("sender is null"); } return r; } synchronized(received_msgs) { win=(NakReceiverWindow)received_msgs.get(sender); } if(win == null) { if(log.isErrorEnabled()) { log.error("sender " + sender + " not found in received_msgs"); } return r; } if(stop_at_gaps) { r=new Range(win.getLowestSeen(), win.getHighestSeen()); // deliverable messages (no gaps) } else { r=new Range(win.getLowestSeen(), win.getHighestReceived() + 1); // received messages } return r; } /** * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather * then <em>received</em> */ private long getHighSeqnoSeen(Address sender) { NakReceiverWindow win; long ret=0; if(sender == null) { if(log.isErrorEnabled()) { log.error("sender is null"); } return ret; } if(sender.equals(local_addr)) { return seqno - 1; } synchronized(received_msgs) { win=(NakReceiverWindow)received_msgs.get(sender); } if(win == null) { if(log.isErrorEnabled()) { log.error("sender " + sender + " not found in received_msgs"); } return ret; } ret=win.getHighestReceived(); return ret; } /** * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs: * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the * NakReceiverWindow corresponding to P which are <= seqno at digest[P]. */ private void stable(Digest d) { NakReceiverWindow recv_win; long my_highest_rcvd; // highest seqno received in my digest for a sender P long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P if(members == null || local_addr == null || d == null) { if(warn) log.warn("members, local_addr or digest are null !"); return; } if(trace) { log.trace("received stable digest " + d); } Map.Entry entry; Address sender; org.jgroups.protocols.pbcast.Digest.Entry val; long high_seqno_delivered, high_seqno_received; for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sender=(Address)entry.getKey(); if(sender == null) continue; val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); high_seqno_delivered=val.high_seqno; high_seqno_received=val.high_seqno_seen; // check whether the last seqno received for a sender P in the stability vector is > last seqno // received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic // in DESIGN) synchronized(received_msgs) { recv_win=(NakReceiverWindow)received_msgs.get(sender); } if(recv_win != null) { my_highest_rcvd=recv_win.getHighestReceived(); stability_highest_rcvd=high_seqno_received; if(stability_highest_rcvd >= 0 && stability_highest_rcvd > my_highest_rcvd) { if(trace) { log.trace("my_highest_rcvd (" + my_highest_rcvd + ") < stability_highest_rcvd (" + stability_highest_rcvd + "): requesting retransmission of " + sender + '#' + stability_highest_rcvd); } retransmit(stability_highest_rcvd, stability_highest_rcvd, sender); } } high_seqno_delivered-=gc_lag; if(high_seqno_delivered < 0) { continue; } if(trace) log.trace("deleting msgs <= " + high_seqno_delivered + " from " + sender); // garbage collect from sent_msgs if sender was myself if(sender.equals(local_addr)) { synchronized(sent_msgs) { // gets us a subset from [lowest seqno - seqno] SortedMap stable_keys=sent_msgs.headMap(new Long(high_seqno_delivered)); if(stable_keys != null) { stable_keys.clear(); // this will modify sent_msgs directly } } } // delete *delivered* msgs that are stable // recv_win=(NakReceiverWindow)received_msgs.get(sender); if(recv_win != null) recv_win.stable(high_seqno_delivered); // delete all messages with seqnos <= seqno } } /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */ /** * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected. */ public void retransmit(long first_seqno, long last_seqno, Address sender) { NakAckHeader hdr; Message retransmit_msg; Address dest=sender; // to whom do we send the XMIT request ? if(xmit_from_random_member && !local_addr.equals(sender)) { Address random_member=(Address)Util.pickRandomElement(members); if(random_member != null && !local_addr.equals(random_member)) { dest=random_member; if(trace) log.trace("picked random member " + dest + " to send XMIT request to"); } } hdr=new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno, last_seqno, sender); retransmit_msg=new Message(dest, null, null); if(trace) log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + dest); retransmit_msg.putHeader(name, hdr); passDown(new Event(Event.MSG, retransmit_msg)); if(stats) { xmit_reqs_sent+=last_seqno - first_seqno +1; updateStats(sent, dest, 1, 0, 0); for(long i=first_seqno; i <= last_seqno; i++) { XmitRequest req=new XmitRequest(sender, i, dest); send_history.add(req); } } } /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */ /* ----------------------- Interface NakReceiverWindow.Listener ---------------------- */ public void missingMessageReceived(long seqno, Message msg) { if(stats) { missing_msgs_received++; updateStats(received, msg.getSrc(), 0, 0, 1); MissingMessage missing=new MissingMessage(msg.getSrc(), seqno); receive_history.add(missing); } } /* ------------------- End of Interface NakReceiverWindow.Listener ------------------- */ private void clear() { NakReceiverWindow win; // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between // a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might // fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection) // sent_msgs.clear(); synchronized(received_msgs) { for(Iterator it=received_msgs.values().iterator(); it.hasNext();) { win=(NakReceiverWindow)it.next(); win.reset(); } received_msgs.clear(); } } private void reset() { NakReceiverWindow win; synchronized(sent_msgs) { sent_msgs.clear(); seqno=-1; } synchronized(received_msgs) { for(Iterator it=received_msgs.values().iterator(); it.hasNext();) { win=(NakReceiverWindow)it.next(); win.destroy(); } received_msgs.clear(); } } public String printMessages() { StringBuffer ret=new StringBuffer(); Map.Entry entry; Address addr; Object w; ret.append("\nsent_msgs: ").append(printSentMsgs()); ret.append("\nreceived_msgs:\n"); synchronized(received_msgs) { for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); addr=(Address)entry.getKey(); w=entry.getValue(); ret.append(addr).append(": ").append(w.toString()).append('\n'); } } return ret.toString(); } public String printSentMsgs() { StringBuffer sb=new StringBuffer(); Long min_seqno, max_seqno; synchronized(sent_msgs) { min_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.firstKey() : new Long(0); max_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.lastKey() : new Long(0); } sb.append('[').append(min_seqno).append(" - ").append(max_seqno).append("] (").append(sent_msgs.size()).append(")"); return sb.toString(); } private void handleConfigEvent(HashMap map) { if(map == null) { return; } if(map.containsKey("frag_size")) { max_xmit_size=((Integer)map.get("frag_size")).intValue(); if(log.isInfoEnabled()) { log.info("max_xmit_size=" + max_xmit_size); } } } static class Entry { long xmit_reqs, xmit_rsps, missing_msgs_rcvd; public String toString() { StringBuffer sb=new StringBuffer(); sb.append(xmit_reqs).append(" xmit_reqs").append(", ").append(xmit_rsps).append(" xmit_rsps"); sb.append(", ").append(missing_msgs_rcvd).append(" missing msgs"); return sb.toString(); } } static class XmitRequest { Address original_sender; // original sender of message long seq, timestamp=System.currentTimeMillis(); Address xmit_dest; // destination to which XMIT_REQ is sent, usually the original sender XmitRequest(Address original_sender, long seqno, Address xmit_dest) { this.original_sender=original_sender; this.xmit_dest=xmit_dest; this.seq=seqno; } public String toString() { StringBuffer sb=new StringBuffer(); sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq); sb.append(" (XMIT_REQ sent to ").append(xmit_dest).append(")"); return sb.toString(); } } static class MissingMessage { Address original_sender; long seq, timestamp=System.currentTimeMillis(); MissingMessage(Address original_sender, long seqno) { this.original_sender=original_sender; this.seq=seqno; } public String toString() { StringBuffer sb=new StringBuffer(); sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq); return sb.toString(); } } /* ----------------------------- End of Private Methods ------------------------------------ */}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -