📄 ackmcastsenderwindow.java
字号:
synchronized(msgs) { if (msgs.get(new Long(seqno)) != null) return; e = new Entry(seqno, msg, receivers, retransmit_intervals); msgs.put(new Long(seqno), e); retransmitter.add(e); } } /** * An ACK has been received from <code>sender</code>. Tag the sender in * the hash table as 'received'. If all ACKs have been received, remove * the entry all together. * * @param seqno The sequence number of the message for which an ACK has * been received. * @param sender The sender which sent the ACK */ public void ack(long seqno, Address sender) { Entry entry; Boolean received; synchronized(msgs) { entry = (Entry)msgs.get(new Long(seqno)); if (entry == null) return; synchronized(entry) { received = (Boolean)entry.senders.get(sender); if (received == null || received.booleanValue()) return; // If not yet received entry.senders.put(sender, Boolean.TRUE); entry.num_received++; if (!entry.allReceived()) return; } synchronized(stable_msgs) { entry.cancel(); msgs.remove(new Long(seqno)); stable_msgs.add(new Long(seqno)); } // wake up waitUntilAllAcksReceived() method msgs.notifyAll(); } } /** * Remove <code>obj</code> from all receiver sets and wake up * retransmission thread. * * @param obj the sender to remove */ public void remove(Address obj) { Long key; Entry entry; synchronized(msgs) { for (Enumeration e = msgs.keys(); e.hasMoreElements();) { key = (Long)e.nextElement(); entry = (Entry)msgs.get(key); synchronized(entry) { //if (((Boolean)entry.senders.remove(obj)).booleanValue()) entry.num_received--; //if (!entry.allReceived()) continue; Boolean received = (Boolean)entry.senders.remove(obj); if(received == null) continue; // suspected member not in entry.senders ? if (received.booleanValue()) entry.num_received--; if (!entry.allReceived()) continue; } synchronized(stable_msgs) { entry.cancel(); msgs.remove(key); stable_msgs.add(key); } // wake up waitUntilAllAcksReceived() method msgs.notifyAll(); } } } /** * Process with address <code>suspected</code> is suspected: remove it * from all receiver sets. This means that no ACKs are expected from this * process anymore. * * @param suspected The suspected process */ public void suspect(Address suspected) { if(log.isInfoEnabled()) log.info("suspect is " + suspected); remove(suspected); suspects.add(suspected); if(suspects.size() >= max_suspects) suspects.removeFirst(); } /** * @return a copy of stable messages, or null (if non available). Removes * all stable messages afterwards */ public Vector getStableMessages() { Vector retval; synchronized(stable_msgs) { retval = (stable_msgs.size() > 0)? (Vector)stable_msgs.clone():null; if (stable_msgs.size() > 0) stable_msgs.clear(); } return(retval); } public void clearStableMessages() { synchronized(stable_msgs) { stable_msgs.clear(); } } /** * @return the number of currently pending msgs */ public long size() { synchronized(msgs) { return(msgs.size()); } } /** Returns the number of members for a given entry for which acks have to be received */ public long getNumberOfResponsesExpected(long seqno) { Entry entry=(Entry)msgs.get(new Long(seqno)); if(entry != null) return entry.senders.size(); else return -1; } /** Returns the number of members for a given entry for which acks have been received */ public long getNumberOfResponsesReceived(long seqno) { Entry entry=(Entry)msgs.get(new Long(seqno)); if(entry != null) return entry.num_received; else return -1; } /** Prints all members plus whether an ack has been received from those members for a given seqno */ public String printDetails(long seqno) { Entry entry=(Entry)msgs.get(new Long(seqno)); if(entry != null) return entry.toString(); else return null; } /** * Waits until all outstanding messages have been ACKed by all receivers. * Takes into account suspicions and view changes. Returns when there are * no entries left in the hashtable. <b>While waiting, no entries can be * added to the hashtable (they will be discarded).</b> * * @param timeout Miliseconds to wait. 0 means wait indefinitely. */ public void waitUntilAllAcksReceived(long timeout) { long time_to_wait, start_time, current_time; Address suspect; // remove all suspected members from retransmission for(Iterator it=suspects.iterator(); it.hasNext();) { suspect=(Address)it.next(); remove(suspect); } time_to_wait = timeout; waiting = true; if (timeout <= 0) { synchronized(msgs) { while(msgs.size() > 0) try { msgs.wait(); } catch(InterruptedException ex) {} } } else { start_time = System.currentTimeMillis(); synchronized(msgs) { while(msgs.size() > 0) { current_time = System.currentTimeMillis(); time_to_wait = timeout - (current_time - start_time); if (time_to_wait <= 0) break; try { msgs.wait(time_to_wait); } catch(InterruptedException ex) { if(log.isWarnEnabled()) log.warn(ex.toString()); } } } } waiting = false; } /** * Start the retransmitter. This has no effect, if the retransmitter * was externally provided */ public void start() { if (retransmitter_owned) retransmitter.start(); } /** * Stop the rentransmition and clear all pending msgs. * <p> * If this retransmitter has been provided an externally managed * scheduler, then just clear all msgs and the associated tasks, else * stop the scheduler. In this case the method blocks until the * scheduler's thread is dead. Only the owner of the scheduler should * stop it. */ public void stop() { Entry entry; // i. If retransmitter is owned, stop it else cancel all tasks // ii. Clear all pending msgs and notify anyone waiting synchronized(msgs) { if (retransmitter_owned) { try { retransmitter.stop(); } catch(InterruptedException ex) { if(log.isErrorEnabled()) log.error(_toString(ex)); } } else { for (Enumeration e = msgs.elements(); e.hasMoreElements();) { entry = (Entry)e.nextElement(); entry.cancel(); } } msgs.clear(); // wake up waitUntilAllAcksReceived() method msgs.notifyAll(); } } /** * Remove all pending msgs from the hashtable. Cancel all associated * tasks in the retransmission scheduler */ public void reset() { Entry entry; if (waiting) return; synchronized(msgs) { for (Enumeration e = msgs.elements(); e.hasMoreElements();) { entry = (Entry)e.nextElement(); entry.cancel(); } msgs.clear(); msgs.notifyAll(); } } public String toString() { StringBuffer ret; Entry entry; Long key; ret = new StringBuffer(); synchronized(msgs) { ret.append("msgs: (").append(msgs.size()).append(')'); for (Enumeration e = msgs.keys(); e.hasMoreElements();) { key = (Long)e.nextElement(); entry = (Entry)msgs.get(key); ret.append("key = ").append(key).append(", value = ").append(entry).append('\n'); } synchronized(stable_msgs) { ret.append("\nstable_msgs: ").append(stable_msgs); } } return(ret.toString()); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -