⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ackmcastsenderwindow.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	synchronized(msgs) {	    if (msgs.get(new Long(seqno)) != null) return;	    e = new Entry(seqno, msg, receivers, retransmit_intervals);	    msgs.put(new Long(seqno), e);	    retransmitter.add(e);	}    }    /**     * An ACK has been received from <code>sender</code>. Tag the sender in     * the hash table as 'received'. If all ACKs have been received, remove     * the entry all together.     *     * @param seqno  The sequence number of the message for which an ACK has     * been received.     * @param sender The sender which sent the ACK     */    public void ack(long seqno, Address sender) {        Entry   entry;        Boolean received;	synchronized(msgs) {	    entry = (Entry)msgs.get(new Long(seqno));	    if (entry == null) return;		            synchronized(entry) {            received = (Boolean)entry.senders.get(sender);            if (received == null || received.booleanValue()) return;			            // If not yet received            entry.senders.put(sender, Boolean.TRUE);            entry.num_received++;            if (!entry.allReceived()) return;        }		            synchronized(stable_msgs) {            entry.cancel();            msgs.remove(new Long(seqno));            stable_msgs.add(new Long(seqno));        }        // wake up waitUntilAllAcksReceived() method        msgs.notifyAll();    }    }        /**     * Remove <code>obj</code> from all receiver sets and wake up     * retransmission thread.     *     * @param obj the sender to remove     */    public void remove(Address obj) {	Long  key;	Entry entry;	synchronized(msgs) {	    for (Enumeration e = msgs.keys(); e.hasMoreElements();) {		key   = (Long)e.nextElement();		entry = (Entry)msgs.get(key);		synchronized(entry) {		    //if (((Boolean)entry.senders.remove(obj)).booleanValue()) entry.num_received--;		    //if (!entry.allReceived()) continue;		    Boolean received = (Boolean)entry.senders.remove(obj);		    if(received == null) continue; // suspected member not in entry.senders ?		    if (received.booleanValue()) entry.num_received--;		    if (!entry.allReceived()) continue;		}		synchronized(stable_msgs) {		    entry.cancel();		    msgs.remove(key);		    stable_msgs.add(key);		}		// wake up waitUntilAllAcksReceived() method		msgs.notifyAll();	    }	}    }    /**     * Process with address <code>suspected</code> is suspected: remove it     * from all receiver sets. This means that no ACKs are expected from this     * process anymore.     *     * @param suspected The suspected process     */    public void suspect(Address suspected) {	    if(log.isInfoEnabled()) log.info("suspect is " + suspected);	remove(suspected);	suspects.add(suspected);	if(suspects.size() >= max_suspects)	    suspects.removeFirst();    }    /**     * @return a copy of stable messages, or null (if non available). Removes     * all stable messages afterwards     */    public Vector getStableMessages() {	Vector retval;	synchronized(stable_msgs) {	    retval = (stable_msgs.size() > 0)? (Vector)stable_msgs.clone():null;	    if (stable_msgs.size() > 0) stable_msgs.clear();	}			return(retval);    }    public void clearStableMessages() {	synchronized(stable_msgs) {	    stable_msgs.clear();	}    }    /**     * @return the number of currently pending msgs     */    public long size() {	synchronized(msgs) {	    return(msgs.size());	}    }    /** Returns the number of members for a given entry for which acks have to be received */    public long getNumberOfResponsesExpected(long seqno) {	Entry entry=(Entry)msgs.get(new Long(seqno));	if(entry != null)	    return entry.senders.size();	else	    return -1;    }    /** Returns the number of members for a given entry for which acks have been received */    public long getNumberOfResponsesReceived(long seqno) {	Entry entry=(Entry)msgs.get(new Long(seqno));	if(entry != null)	    return entry.num_received;	else	    return -1;    }    /** Prints all members plus whether an ack has been received from those members for a given seqno */    public String printDetails(long seqno) {	Entry entry=(Entry)msgs.get(new Long(seqno));	if(entry != null)	    return entry.toString();	else	    return null;    }    /**     * Waits until all outstanding messages have been ACKed by all receivers.     * Takes into account suspicions and view changes. Returns when there are     * no entries left in the hashtable. <b>While waiting, no entries can be     * added to the hashtable (they will be discarded).</b>     *     * @param timeout Miliseconds to wait. 0 means wait indefinitely.     */    public void waitUntilAllAcksReceived(long timeout) {	long    time_to_wait, start_time, current_time;	Address suspect;	// remove all suspected members from retransmission	for(Iterator it=suspects.iterator(); it.hasNext();) {	    suspect=(Address)it.next();	    remove(suspect);	}		time_to_wait = timeout;	waiting     = true;	if (timeout <= 0) {	    synchronized(msgs) {		while(msgs.size() > 0)		    try { msgs.wait(); } catch(InterruptedException ex) {}	    }	} else {	    start_time = System.currentTimeMillis();	    synchronized(msgs) {		while(msgs.size() > 0) {		    current_time = System.currentTimeMillis();		    time_to_wait = timeout - (current_time - start_time);		    if (time_to_wait <= 0) break;		    		    try {			msgs.wait(time_to_wait);		    } catch(InterruptedException ex) {			if(log.isWarnEnabled()) log.warn(ex.toString());		    }		}	    }	}		waiting = false;    }    /**     * Start the retransmitter. This has no effect, if the retransmitter     * was externally provided     */    public void start() {	if (retransmitter_owned)	    retransmitter.start();    }    /**     * Stop the rentransmition and clear all pending msgs.     * <p>     * If this retransmitter has been provided an externally managed     * scheduler, then just clear all msgs and the associated tasks, else     * stop the scheduler. In this case the method blocks until the     * scheduler's thread is dead. Only the owner of the scheduler should     * stop it.     */    public void stop() {	Entry entry;	// i. If retransmitter is owned, stop it else cancel all tasks	// ii. Clear all pending msgs and notify anyone waiting	synchronized(msgs) {	    if (retransmitter_owned) {		try {		    retransmitter.stop();		} catch(InterruptedException ex) {		    if(log.isErrorEnabled()) log.error(_toString(ex));		}	    } else {		for (Enumeration e = msgs.elements(); e.hasMoreElements();) {		    entry = (Entry)e.nextElement();		    entry.cancel();		}	    }	    msgs.clear();	    // wake up waitUntilAllAcksReceived() method	    msgs.notifyAll();	}    }    /**     * Remove all pending msgs from the hashtable. Cancel all associated     * tasks in the retransmission scheduler     */    public void reset() {	Entry entry;	if (waiting) return;	synchronized(msgs) {	    for (Enumeration e = msgs.elements(); e.hasMoreElements();) {		entry = (Entry)e.nextElement();		entry.cancel();	    }	    msgs.clear();	    msgs.notifyAll();	}    }    public String toString() {	StringBuffer ret;	Entry        entry;	Long         key;	ret = new StringBuffer();	synchronized(msgs) {        ret.append("msgs: (").append(msgs.size()).append(')');	    for (Enumeration e = msgs.keys(); e.hasMoreElements();) {		key   = (Long)e.nextElement();		entry = (Entry)msgs.get(key);            ret.append("key = ").append(key).append(", value = ").append(entry).append('\n');	    }	    synchronized(stable_msgs) {            ret.append("\nstable_msgs: ").append(stable_msgs);	    }	}			return(ret.toString());    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -