📄 retransmitter.java
字号:
/* ---------------------------- End of Private Methods ------------------------------------ */ /** * The retransmit task executed by the scheduler in regular intervals */ private static abstract class Task implements TimeScheduler.Task { private final Interval intervals; private boolean cancelled; protected Task(long[] intervals) { this.intervals=new Interval(intervals); this.cancelled=false; } public long nextInterval() { return (intervals.next()); } public boolean cancelled() { return (cancelled); } public void cancel() { cancelled=true; } } /** * The entry associated with an initial group of missing messages * with contiguous sequence numbers and with all its subgroups.<br> * E.g. * - initial group: [5-34] * - msg 12 is acknowledged, now the groups are: [5-11], [13-34] * <p> * Groups are stored in a list as long[2] arrays of the each group's * bounds. For speed and convenience, the lowest & highest bounds of * all the groups in this entry are also stored separately */ private class Entry extends Task { private long low; private long high; /** List<long[2]> of ranges to be retransmitted */ final java.util.List list=new ArrayList(); public Entry(long low, long high, long[] intervals) { super(intervals); this.low=low; this.high=high; list.add(new long[]{low, high}); } /** * Remove the given seqno and resize or partition groups as * necessary. The algorithm is as follows:<br> * i. Find the group with low <= seqno <= high * ii. If seqno == low, * a. if low == high, then remove the group * Adjust global low. If global low was pointing to the group * deleted in the previous step, set it to point to the next group. * If there is no next group, set global low to be higher than * global high. This way the entry is invalidated and will be removed * all together from the pending msgs and the task scheduler * iii. If seqno == high, adjust high, adjust global high if this is * the group at the tail of the list * iv. Else low < seqno < high, break [low,high] into [low,seqno-1] * and [seqno+1,high] * * @param seqno the sequence number to remove */ public void remove(long seqno) { int i; long[] bounds=null, newBounds; synchronized(list) { for(i=0; i < list.size(); ++i) { bounds=(long[])list.get(i); if(seqno < bounds[0] || seqno > bounds[1]) continue; break; } if(i == list.size()) return; if(seqno == bounds[0]) { if(bounds[0] == bounds[1]) list.remove(i); else bounds[0]++; if(i == 0) low=list.size() == 0 ? high + 1 : ((long[])list.get(i))[0]; } else if(seqno == bounds[1]) { bounds[1]--; if(i == list.size() - 1) high=((long[])list.get(i))[1]; } else { newBounds=new long[2]; newBounds[0]=seqno + 1; newBounds[1]=bounds[1]; bounds[1]=seqno - 1; list.add(i + 1, newBounds); } } } /** * Retransmission task:<br> * For each interval, call the retransmission callback command */ public void run() { long[] bounds; List copy; synchronized(list) { copy=new LinkedList(list); } for(Iterator it=copy.iterator(); it.hasNext();) { bounds=(long[])it.next(); try { cmd.retransmit(bounds[0], bounds[1], sender); } catch(Throwable t) { log.error("failure asking " + cmd + " for retransmission", t); } } } int size() { int size=0; long diff; long[] tmp; synchronized(list) { for(Iterator it=list.iterator(); it.hasNext();) { tmp=(long[])it.next(); diff=tmp[1] - tmp[0] +1; size+=diff; } } return size; } public String toString() { StringBuffer sb=new StringBuffer(); synchronized(list) { long[] range; boolean first=true; for(Iterator it=list.iterator(); it.hasNext();) { range=(long[])it.next(); if(first) { first=false; } else { sb.append(", "); } sb.append(range[0]).append('-').append(range[1]); } } return sb.toString(); } } public static void main(String[] args) { Retransmitter xmitter; Address sender; try { sender=new org.jgroups.stack.IpAddress("localhost", 5555); xmitter=new Retransmitter(sender, new MyXmitter()); xmitter.setRetransmitTimeouts(new long[]{1000, 2000, 4000, 8000}); xmitter.add(1, 10); System.out.println("retransmitter: " + xmitter); xmitter.remove(1); System.out.println("retransmitter: " + xmitter); xmitter.remove(2); System.out.println("retransmitter: " + xmitter); xmitter.remove(4); System.out.println("retransmitter: " + xmitter); Util.sleep(3000); xmitter.remove(3); System.out.println("retransmitter: " + xmitter); Util.sleep(1000); xmitter.remove(10); System.out.println("retransmitter: " + xmitter); xmitter.remove(8); System.out.println("retransmitter: " + xmitter); xmitter.remove(6); System.out.println("retransmitter: " + xmitter); xmitter.remove(7); System.out.println("retransmitter: " + xmitter); xmitter.remove(9); System.out.println("retransmitter: " + xmitter); xmitter.remove(5); System.out.println("retransmitter: " + xmitter); } catch(Exception e) { log.error(e); } } static class MyXmitter implements Retransmitter.RetransmitCommand { public void retransmit(long first_seqno, long last_seqno, Address sender) { System.out.println("-- " + new java.util.Date() + ": retransmit(" + first_seqno + ", " + last_seqno + ", " + sender + ')'); } } static void sleep(long timeout) { Util.sleep(timeout); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -