📄 nakreceiverwindow.java
字号:
* @see NakReceiverWindow#getHighestReceived */ public long getHighestSeen() { try { lock.readLock().acquire(); try { return (highest_seen); } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return -1; } } /** * Find all messages between 'low' and 'high' (including 'low' and * 'high') that have a null msg. * Return them as a list of longs * * @return List<Long>. A list of seqnos, sorted in ascending order. * E.g. [1, 4, 7, 8] */ public List getMissingMessages(long low, long high) { List retval=new List(); // long my_high; if(low > high) { if(log.isErrorEnabled()) log.error("invalid range: low (" + low + ") is higher than high (" + high + ')'); return null; } try { lock.readLock().acquire(); try { // my_high=Math.max(head - 1, 0); // check only received messages, because delivered messages *must* have a non-null msg SortedMap m=received_msgs.subMap(new Long(low), new Long(high+1)); for(Iterator it=m.keySet().iterator(); it.hasNext();) { retval.add(it.next()); }// if(received_msgs.size() > 0) {// entry=(Entry)received_msgs.peek();// if(entry != null) my_high=entry.seqno;// }// for(long i=my_high + 1; i <= high; i++)// retval.add(new Long(i)); return retval; } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return null; } } /** * Returns the highest sequence number received so far (which may be * higher than the highest seqno <em>delivered</em> so far; e.g., for * 1,2,3,5,6 it would be 6. * * @see NakReceiverWindow#getHighestSeen */ public long getHighestReceived() { try { lock.readLock().acquire(); try { return Math.max(tail - 1, -1); } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return -1; } } /** * Return messages that are higher than <code>seqno</code> (excluding * <code>seqno</code>). Check both received <em>and</em> delivered * messages. * @return List<Message>. All messages that have a seqno greater than <code>seqno</code> */ public List getMessagesHigherThan(long seqno) { List retval=new List(); try { lock.readLock().acquire(); try { // check received messages SortedMap m=received_msgs.tailMap(new Long(seqno+1)); for(Iterator it=m.values().iterator(); it.hasNext();) { retval.add((it.next())); } // we retrieve all msgs whose seqno is strictly greater than seqno (tailMap() *includes* seqno, // but we need to exclude seqno, that's why we increment it m=delivered_msgs.tailMap(new Long(seqno +1)); for(Iterator it=m.values().iterator(); it.hasNext();) { retval.add(((Message)it.next()).copy()); } return (retval); } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return null; } } /** * Return all messages m for which the following holds: * m > lower && m <= upper (excluding lower, including upper). Check both * <code>received_msgs</code> and <code>delivered_msgs</code>. */ public List getMessagesInRange(long lower, long upper) { List retval=new List(); try { lock.readLock().acquire(); try { // check received messages SortedMap m=received_msgs.subMap(new Long(lower +1), new Long(upper +1)); for(Iterator it=m.values().iterator(); it.hasNext();) { retval.add(it.next()); } m=delivered_msgs.subMap(new Long(lower +1), new Long(upper +1)); for(Iterator it=m.values().iterator(); it.hasNext();) { retval.add(((Message)it.next()).copy()); } return retval; } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return null; } } /** * Return a list of all messages for which there is a seqno in * <code>missing_msgs</code>. The seqnos of the argument list are * supposed to be in ascending order * @param missing_msgs A List<Long> of seqnos * @return List<Message> */ public List getMessagesInList(List missing_msgs) { List ret=new List(); if(missing_msgs == null) { if(log.isErrorEnabled()) log.error("argument list is null"); return ret; } try { lock.readLock().acquire(); try { Long seqno; Message msg; for(Enumeration en=missing_msgs.elements(); en.hasMoreElements();) { seqno=(Long)en.nextElement(); msg=(Message)delivered_msgs.get(seqno); if(msg != null) ret.add(msg.copy()); msg=(Message)received_msgs.get(seqno); if(msg != null) ret.add(msg.copy()); } return ret; } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return null; } } /** * Returns the message from received_msgs or delivered_msgs. * @param sequence_num * @return Message from received_msgs or delivered_msgs. */ public Message get(long sequence_num) { Message msg; Long seqno=new Long(sequence_num); try { lock.readLock().acquire(); try { msg=(Message)delivered_msgs.get(seqno); if(msg != null) return msg; msg=(Message)received_msgs.get(seqno); if(msg != null) return msg; } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); } return null; } public int size() { boolean acquired=false; try { lock.readLock().acquire(); acquired=true; } catch(InterruptedException e) {} try { return received_msgs.size(); } finally { if(acquired) lock.readLock().release(); } } public String toString() { StringBuffer sb=new StringBuffer(); try { lock.readLock().acquire(); try { sb.append("received_msgs: ").append(printReceivedMessages()); sb.append(", delivered_msgs: ").append(printDeliveredMessages()); } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return ""; } return sb.toString(); } /** * Prints delivered_msgs. Requires read lock present. * @return String */ String printDeliveredMessages() { StringBuffer sb=new StringBuffer(); Long min=null, max=null; if(delivered_msgs.size() > 0) { try {min=(Long)delivered_msgs.firstKey();} catch(NoSuchElementException ex) {} try {max=(Long)delivered_msgs.lastKey();} catch(NoSuchElementException ex) {} } sb.append('[').append(min).append(" - ").append(max).append(']'); if(min != null && max != null) sb.append(" (size=").append(max.longValue() - min.longValue()).append(")"); return sb.toString(); } /** * Prints received_msgs. Requires read lock to be present * @return String */ String printReceivedMessages() { StringBuffer sb=new StringBuffer(); sb.append('['); if(received_msgs.size() > 0) { Long first=null, last=null; try {first=(Long)received_msgs.firstKey();} catch(NoSuchElementException ex) {} try {last=(Long)received_msgs.lastKey();} catch(NoSuchElementException ex) {} sb.append(first).append(" - ").append(last); int non_received=0; Map.Entry entry; for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); if(entry.getValue() == null) non_received++; } sb.append(" (size=").append(received_msgs.size()).append(", missing=").append(non_received).append(')'); } sb.append(']'); return sb.toString(); } /* ------------------------------- Private Methods -------------------------------------- */ /** * Sets the value of lowest_seen to the lowest seqno of the delivered messages (if available), otherwise * to the lowest seqno of received messages. */ private void updateLowestSeen() { Long lowest_seqno=null; // If both delivered and received messages are empty, let the highest // seen seqno be the one *before* the one which is expected to be // received next by the NakReceiverWindow (head-1) // incorrect: if received and delivered msgs are empty, don't do anything: we may have initial values, // but both lists are cleaned after some time of inactivity // (bela April 19 2004) /* if((delivered_msgs.size() == 0) && (msgs.size() == 0)) { lowest_seen=0; return; } */ // The lowest seqno is the first seqno of the delivered messages if(delivered_msgs.size() > 0) { try { lowest_seqno=(Long)delivered_msgs.firstKey(); if(lowest_seqno != null) lowest_seen=lowest_seqno.longValue(); } catch(NoSuchElementException ex) { } } // If no elements in delivered messages (e.g. due to message garbage collection), use the received messages else { if(received_msgs.size() > 0) { try { lowest_seqno=(Long)received_msgs.firstKey(); if(received_msgs.get(lowest_seqno) != null) { // only set lowest_seen if we *have* a msg lowest_seen=lowest_seqno.longValue(); } } catch(NoSuchElementException ex) {} } } } /** * Find the highest seqno that is deliverable or was actually delivered. * Returns seqno-1 if there are no messages in the queues (the first * message to be expected is always seqno). */// private void updateHighestSeen() {// long ret=0;// Map.Entry entry=null;//// // If both delivered and received messages are empty, let the highest// // seen seqno be the one *before* the one which is expected to be// // received next by the NakReceiverWindow (head-1)//// // changed by bela (April 19 2004): we don't change the value if received and delivered msgs are empty// /*if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {// highest_seen=0;// return;// }*/////// // The highest seqno is the last of the delivered messages, to start with,// // or again the one before the first seqno expected (if no delivered// // msgs). Then iterate through the received messages, and find the highest seqno *before* a gap// Long highest_seqno=null;// if(delivered_msgs.size() > 0) {// try {// highest_seqno=(Long)delivered_msgs.lastKey();// ret=highest_seqno.longValue();// }// catch(NoSuchElementException ex) {// }// }// else {// ret=Math.max(head - 1, 0);// }//// // Now check the received msgs head to tail. if there is an entry// // with a non-null msg, increment ret until we find an entry with// // a null msg// for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {// entry=(Map.Entry)it.next();// if(entry.getValue() != null)// ret=((Long)entry.getKey()).longValue();// else// break;// }// highest_seen=Math.max(ret, 0);// } /** * Reset the Nak window. Should be called from within a writeLock() context. * <p> * i. Delete all received entries<br> * ii. Delete alll delivered entries<br> * iii. Reset all indices (head, tail, etc.)<br> */ private void _reset() { received_msgs.clear(); delivered_msgs.clear(); head=0; tail=0; lowest_seen=0; highest_seen=0; } /* --------------------------- End of Private Methods ----------------------------------- */}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -