📄 nakack.java
字号:
* * @param xmit_requester The sender of the XMIT_REQ, we have to send the requested copy of the message to this address * @param first_seqno The first sequence number to be retransmitted (<= last_seqno) * @param last_seqno The last sequence number to be retransmitted (>= first_seqno) * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null */ private void handleXmitReq(Address xmit_requester, long first_seqno, long last_seqno, Address original_sender) { Message m, tmp; LinkedList list; long size=0, marker=first_seqno, len; NakReceiverWindow win=null; boolean amISender; // am I the original sender ? if(trace) { StringBuffer sb=new StringBuffer(); sb.append(local_addr).append(": received xmit request from ").append(xmit_requester).append(" for "); sb.append(original_sender).append(" [").append(first_seqno).append(" - ").append(last_seqno).append("]"); log.trace(sb.toString()); } if(first_seqno > last_seqno) { if(log.isErrorEnabled()) log.error("first_seqno (" + first_seqno + ") > last_seqno (" + last_seqno + "): not able to retransmit"); return; } if(stats) { xmit_reqs_received+=last_seqno - first_seqno +1; updateStats(received, xmit_requester, 1, 0, 0); } amISender=local_addr.equals(original_sender); if(!amISender) win=(NakReceiverWindow)received_msgs.get(original_sender); list=new LinkedList(); for(long i=first_seqno; i <= last_seqno; i++) { if(amISender) { m=(Message)sent_msgs.get(new Long(i)); // no need to synchronize } else { m=win != null? win.get(i) : null; } if(m == null) { if(log.isErrorEnabled()) { StringBuffer sb=new StringBuffer(); sb.append("(requester=").append(xmit_requester).append(", local_addr=").append(this.local_addr); sb.append(") message ").append(original_sender).append("::").append(i); sb.append(" not found in ").append((amISender? "sent" : "received")).append(" msgs. "); if(win != null) { sb.append("Received messages from ").append(original_sender).append(": ").append(win.toString()); } else { sb.append("\nSent messages: ").append(printSentMsgs()); } log.error(sb); } continue; } len=m.size(); size+=len; if(size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709) // yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size. // size has reached max_xmit_size. go ahead and send message (excluding the current message) if(trace) log.trace("xmitting msgs [" + marker + '-' + (i - 1) + "] to " + xmit_requester); sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, i - 1); marker=i; list.clear(); // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under // bug report #854887 size=len; } if(Global.copy) { tmp=m.copy(); } else { tmp=m; } // tmp.setDest(xmit_requester); // tmp.setSrc(local_addr); if(tmp.getSrc() == null) tmp.setSrc(local_addr); list.add(tmp); } if(list.size() > 0) { if(trace) log.trace("xmitting msgs [" + marker + '-' + last_seqno + "] to " + xmit_requester); sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, last_seqno); list.clear(); } } private static void updateStats(HashMap map, Address key, int req, int rsp, int missing) { Entry entry=(Entry)map.get(key); if(entry == null) { entry=new Entry(); map.put(key, entry); } entry.xmit_reqs+=req; entry.xmit_rsps+=rsp; entry.missing_msgs_rcvd+=missing; } private void sendXmitRsp(Address dest, LinkedList xmit_list, long first_seqno, long last_seqno) { Buffer buf; if(xmit_list == null || xmit_list.size() == 0) { if(log.isErrorEnabled()) log.error("xmit_list is empty"); return; } if(use_mcast_xmit) dest=null; if(stats) { xmit_rsps_sent+=xmit_list.size(); updateStats(sent, dest, 0, 1, 0); } try { buf=Util.msgListToByteBuffer(xmit_list); Message msg=new Message(dest, null, buf.getBuf(), buf.getOffset(), buf.getLength()); msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP, first_seqno, last_seqno)); passDown(new Event(Event.MSG, msg)); } catch(IOException ex) { log.error("failed marshalling xmit list", ex); } } private void handleXmitRsp(Message msg) { LinkedList list; Message m; if(msg == null) { if(warn) log.warn("message is null"); return; } try { list=Util.byteBufferToMessageList(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); if(list != null) { if(stats) { xmit_rsps_received+=list.size(); updateStats(received, msg.getSrc(), 0, 1, 0); } for(Iterator it=list.iterator(); it.hasNext();) { m=(Message)it.next(); up(new Event(Event.MSG, m)); } list.clear(); } } catch(Exception ex) { if(log.isErrorEnabled()) { log.error("failed reading list of retransmitted messages", ex); } } } /** * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all * entries from received_msgs that are not in <code>members</code> */ private void adjustReceivers(boolean remove) { Address sender; NakReceiverWindow win; synchronized(received_msgs) { if(remove) { // 1. Remove all senders in received_msgs that are not members anymore for(Iterator it=received_msgs.keySet().iterator(); it.hasNext();) { sender=(Address)it.next(); if(!members.contains(sender)) { win=(NakReceiverWindow)received_msgs.get(sender); win.reset(); if(log.isDebugEnabled()) { log.debug("removing " + sender + " from received_msgs (not member anymore)"); } it.remove(); } } } // 2. Add newly joined members to received_msgs (starting seqno=0) for(int i=0; i < members.size(); i++) { sender=(Address)members.elementAt(i); if(!received_msgs.containsKey(sender)) { win=createNakReceiverWindow(sender, 0); received_msgs.put(sender, win); } } } } /** * Returns a message digest: for each member P the highest seqno received from P is added to the digest. */ private Digest getDigest() { Digest digest; Address sender; Range range; digest=new Digest(members.size()); for(int i=0; i < members.size(); i++) { sender=(Address)members.elementAt(i); range=getLowestAndHighestSeqno(sender, false); // get the highest received seqno if(range == null) { if(log.isErrorEnabled()) { log.error("range is null"); } continue; } digest.add(sender, range.low, range.high); // add another entry to the digest } return digest; } /** * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN). */ private Digest getDigestHighestDeliveredMsgs() { Digest digest; Address sender; Range range; long high_seqno_seen; digest=new Digest(members.size()); for(int i=0; i < members.size(); i++) { sender=(Address)members.elementAt(i); range=getLowestAndHighestSeqno(sender, true); // get the highest deliverable seqno if(range == null) { if(log.isErrorEnabled()) { log.error("range is null"); } continue; } high_seqno_seen=getHighSeqnoSeen(sender); digest.add(sender, range.low, range.high, high_seqno_seen); // add another entry to the digest } return digest; } /** * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists, * reset it. */ private void setDigest(Digest d) { if(d == null || d.senders == null) { if(log.isErrorEnabled()) { log.error("digest or digest.senders is null"); } return; } clear(); Map.Entry entry; Address sender; org.jgroups.protocols.pbcast.Digest.Entry val; long initial_seqno; NakReceiverWindow win; for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sender=(Address)entry.getKey(); val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); if(sender == null || val == null) { if(warn) { log.warn("sender or value is null"); } continue; } initial_seqno=val.high_seqno; win=createNakReceiverWindow(sender, initial_seqno); synchronized(received_msgs) { received_msgs.put(sender, win); } } } /** * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry * exists, create one with the initial seqno set to the seqno of the member in the digest. */ private void mergeDigest(Digest d) { if(d == null || d.senders == null) { if(log.isErrorEnabled()) { log.error("digest or digest.senders is null"); } return; } Map.Entry entry; Address sender; org.jgroups.protocols.pbcast.Digest.Entry val; NakReceiverWindow win; long initial_seqno; for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sender=(Address)entry.getKey(); val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); if(sender == null || val == null) { if(warn) { log.warn("sender or value is null"); } continue; } initial_seqno=val.high_seqno; synchronized(received_msgs) { win=(NakReceiverWindow)received_msgs.get(sender); if(win == null) { win=createNakReceiverWindow(sender, initial_seqno); received_msgs.put(sender, win); } else { if(win.getHighestReceived() < initial_seqno) { win.reset(); received_msgs.remove(sender); win=createNakReceiverWindow(sender, initial_seqno); received_msgs.put(sender, win); } } } } } private NakReceiverWindow createNakReceiverWindow(Address sender, long initial_seqno) { NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, timer); win.setRetransmitTimeouts(retransmit_timeout);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -