total_old.java
来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页
JAVA
1,369 行
// $Id: TOTAL_OLD.java,v 1.12 2006/01/19 09:53:37 belaban Exp $package org.jgroups.protocols;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Util;import org.jgroups.stack.Protocol;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.Vector;/** * class SavedMessages * <p/> * Stores a set of messages along with their sequence id (assigned by the sequencer). */class SavedMessages { final Log log=LogFactory.getLog(SavedMessages.class); /** * class Entry (inner class) * <p/> * object type to store in the messages Vector (need to store sequence id in addition to message) */ class Entry { private final Message msg; private final long seq; Entry(Message msg, long seq) { this.msg=msg; this.seq=seq; } public Message getMsg() { return msg; } public long getSeq() { return seq; } } // class Entry private final Vector messages; // vector of "Entry"s to store "Message"s, sorted by sequence id /** * Constructor - creates an empty space to store messages */ SavedMessages() { messages=new Vector(); } /** * inserts the specified message and sequence id into the "list" of stored messages * if the sequence id given is already stored, then nothing is stored */ public void insertMessage(Message msg, long seq) { synchronized(messages) { int size=messages.size(); int index=0; long this_seq=-1; // used to prevent duplicate messages being stored // find the index where this message should be inserted try { while((index < size) && ((this_seq=((Entry)(messages.elementAt(index))).getSeq()) < seq)) { index++; } } catch(java.lang.ClassCastException e) { log.error("Error: (TOTAL_OLD) SavedMessages.insertMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index " + index + ')'); return; } // check that the sequences aren't the same (don't want duplicates) if(this_seq == seq) { log.error("SavedMessages.insertMessage() - sequence " + seq + " already exists in saved messages. Message NOT saved."); return; } messages.insertElementAt(new Entry(msg, seq), index); } // synchronized( messages ) } /** * returns a copy of the stored message with the given sequence id * if delete_msg is true, then the message is removed from the * the list of stored messages, otherwise the message is not * removed from the list * if no message is stored with this sequence id, null is returned */ private Message getMessage(long seq, boolean delete_msg) { synchronized(messages) { int size=messages.size(); int index=0; long this_seq=-1; try { while((index < size) && ((this_seq=(((Entry)(messages.elementAt(index))).getSeq())) < seq)) { index++; } } catch(java.lang.ClassCastException e) { log.error("Error: (TOTAL_OLD) SavedMessages.getMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index " + index + ')'); return null; } // determine if we found the specified sequence if(this_seq == seq) { // we found the message at index Object temp_obj=messages.elementAt(index); if(temp_obj instanceof Entry) { Message ret_val=((Entry)temp_obj).getMsg().copy(); // should we delete if(delete_msg) { messages.removeElementAt(index); } return ret_val; } else { log.error("Error: (TOTAL_OLD) SavedMessages.getMessage() - could not cast element of \"messages\" to an Entry (index " + index + ')'); return null; } // if ( temp_obj instanceof Entry ) } else { // we didn't find this sequence number in the messages return null; } } // synchronized( messages ) } /** * returns a stored message with the given sequence id * the message is then removed from the list of stored messages * if no message is stored with this sequence id, null is returned */ public Message getMessage(long seq) { return getMessage(seq, true); } /** * similar to GetMessage, except a copy of the message is returned * and the message is not removed from the list */ public Message peekMessage(long seq) { return getMessage(seq, false); } /** * returns a copy of the stored message with the lowest sequence id * if delete_msg is true, then the message is removed from the * the list of stored messages, otherwise the message is not * removed from the list * if their are no messages stored, null is returned */ private Message getFirstMessage(boolean delete_msg) { synchronized(messages) { if(isEmpty()) { return null; } else { Object temp_obj=messages.firstElement(); if(temp_obj instanceof Entry) { Message ret_val=((Entry)temp_obj).getMsg().copy(); messages.removeElementAt(0); return ret_val; } else { log.error("Error: (TOTAL_OLD) SavedMessages.getFirstMessage() - could not cast element of \"messages\" to an Entry"); return null; } // if ( temp_obj instanceof Entry ) } } // synchronized( messages ) } /** * returns the stored message with the lowest sequence id; * the message is then removed from the list of stored messages * if their are no messages stored, null is returned */ public synchronized Message getFirstMessage() { return getFirstMessage(true); } /** * similar to GetFirstMessage, except a copy of the message is returned * and the message is not removed from the list */ public Message peekFirstMessage() { return getFirstMessage(false); } /** * returns the lowest sequence id of the messages stored * if no messages are stored, -1 is returned */ public long getFirstSeq() { synchronized(messages) { if(isEmpty()) { return -1; } else { Object temp_obj=messages.firstElement(); if(temp_obj instanceof Entry) { return ((Entry)temp_obj).getSeq(); } else { log.error("Error: (TOTAL_OLD) SavedMessages.getFirstSeq() - could not cast element of \"messages\" to an Entry "); return -1; } } } // synchronized( messages ) } /** * returns true if there are messages stored * returns false if there are no messages stored */ public boolean isEmpty() { return messages.isEmpty(); } /** * returns the number of messages stored */ public int getSize() { return messages.size(); } /** * clears all of the stored messages */ public void clearMessages() { synchronized(messages) { messages.removeAllElements(); } }} // class SavedMessages/** * class MessageAcks * <p/> * Used by sequencer to store cumulative acknowledgements of broadcast messages * sent to the group in this view */class MessageAcks { final Log log=LogFactory.getLog(MessageAcks.class); // TODO: may also want to store some sort of timestamp in each Entry (maybe) /** * class Entry (inner class) * <p/> * object type to store cumulative acknowledgements using a member's Address * and the sequence id of a message */ class Entry { public final Address addr; public long seq; Entry(Address addr, long seq) { this.addr=addr; this.seq=seq; } Entry(Address addr) { this.addr=addr; this.seq=-1; // means that no acknowledgements have been made yet } } // class Entry // Vector of "Entry"s representing cumulative acknowledgements for each member of the group private final Vector acks; private final SavedMessages message_history; // history of broadcast messages sent /** * Constructor - creates a Vector of "Entry"s given a Vector of "Address"es for the members */ MessageAcks(Vector members) { acks=new Vector(); // initialize the message history to contain no messages message_history=new SavedMessages(); // insert slots for each member in the acknowledgement Vector reset(members); } /** * resets acknowledgement Vector with "Entry"s using the given Vector of "Address"es * also clears the message history */ public synchronized void reset(Vector members) { clear(); // initialize Vector of acknowledgements (no acks for any member) int num_members=members.size(); for(int i=0; i < num_members; i++) { Object temp_obj=members.elementAt(i); if(temp_obj instanceof Address) { acks.addElement(new Entry((Address)temp_obj)); } else { log.error("Error: (TOTAL_OLD) MessageAcks.reset() - could not cast element of \"members\" to an Address object"); return; } } } /** * clear all acknowledgements and the message history */ private void clear() { acks.removeAllElements(); message_history.clearMessages(); } /** * returns the Entry from the acknowledgement Vector with the given Address * returns null if an Entry with the given Address is not found */ private Entry getEntry(Address addr) { synchronized(acks) { // look for this addreess in the acknowledgement Vector int size=acks.size(); for(int i=0; i < size; i++) { Object temp_obj=acks.elementAt(i); if(temp_obj instanceof Entry) { Entry this_entry=(Entry)temp_obj; if((this_entry.addr).equals(addr)) { // the given Address matches this entry
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?