📄 messagevector.java
字号:
/* * Copyright (C) 2004 - 2005 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s): */package fr.dyade.aaa.agent;import java.io.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.EmptyQueueException;/** * Class <code>MessageVector</code> represents a persistent vector of * <tt>Message</tt> (source and target agent identifier, notification). * As messages have a relatively short life span, then the messages * are kept in main memory. If possible, the list is backed by a persistent * image on the disk for reliability needs. In this case, we can use * <tt>SoftReference</tt> to avoid memory overflow.<p><hr> * The stamp information in Message is used to restore the queue from * persistent storage at initialization time, so there is no longer need * to save <code>MessageVector</code> object state. */final class MessageVector implements MessageQueue { private Logger logmon = null; private String logmsg = null; private long cpt1, cpt2; /** * The array buffer into which the <code>Message</code> objects are stored * in memory. The capacity of this array buffer is at least large enough to * contain all the messages of the <code>MessageVector</code>.<p> * Messages are stored in a circular way, first one in <tt>data[first]</tt> * through <tt>data[(first+count-1)%length]</tt>. Any other array elements * are null. */ private Object data[]; /** The index of the first message in the circular buffer. */ private int first; /** * The number of messages in this <tt>MessageVector</tt> object. Components * <tt>data[first]</tt> through <tt>data[(first+count)%length]</tt> are the * actual items. */ private int count; /** The number of validated message in this <tt>MessageQueue</tt>. */ private int validated; private boolean persistent; MessageVector(String name, boolean persistent) { logmon = Debug.getLogger(getClass().getName() + '.' + name); logmsg = name + ".MessageVector: "; this.persistent = persistent; data = new Object[50]; first = 0; count = 0; validated = 0; } /** * Insert a message in the queue, it should only be used during * initialization for restoring the queue state. * * @param item the message to be pushed onto this queue. */ public synchronized void insert(Message item) { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "insert(" + item + ")"); int i = 0; for (; i<validated; i++) { Message msg = getMessageAt(i); if (item.getStamp() < msg.getStamp()) break; } insertMessageAt(item, i); validated += 1; } /** * Pushes a message onto the bottom of this queue. It should only * be used during a transaction. The item will be really available * after the transaction commit and the queue validate. * * @param item the message to be pushed onto this queue. */ public synchronized void push(Message item) { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "push(" + item + ")"); addMessage(item); } /** * Removes the message at the top of this queue. * It must only be used during a transaction. * * @return The message at the top of this queue. * @exception EmptyQueueException if this queue is empty. */ public synchronized Message pop() throws EmptyQueueException { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "pop()"); if (validated == 0) throw new EmptyQueueException(); Message item = getMessageAt(0); removeMessageAt(0); validated -= 1; return item; } /** * Atomicaly validates all messages pushed in queue during a reaction. * It must only be used during a transaction. */ public synchronized void validate() { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "validate()"); validated = size(); notify(); } /** * Looks at the message at the top of this queue without removing * it from the queue. * It should never be used during a transaction to avoid dead-lock * problems. * * @return the message at the top of this queue. * @exception InterruptedException if another thread has interrupted the * current thread. */ public synchronized Message get() throws InterruptedException { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) { logmon.log(BasicLevel.DEBUG, logmsg + "get()"); cpt1 += 1; cpt2 += validated; if ((cpt1 & 0xFFFFL) == 0L) { logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated); } } while (validated == 0) { wait(); } Message item = getMessageAt(0); if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item); return item; } /** * Looks at the message at the top of this queue without removing * it from the queue. It should never be used during a transaction * to avoid dead-lock problems. It waits until a message is available * or the specified amount of time has elapsed. * * @param timeout the maximum time to wait in milliseconds. * @return the message at the top of this queue. * @exception InterruptedException if another thread has interrupted the * current thread. * @exception IllegalArgumentException if the value of timeout is negative. */ public synchronized Message get(long timeout) throws InterruptedException { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) { logmon.log(BasicLevel.DEBUG, logmsg + "get(" + timeout + ")"); cpt1 += 1; cpt2 += validated; if ((cpt1 & 0xFFFFL) == 0L) { logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated); } } Message item = null; if ((validated == 0) && (timeout > 0)) wait(timeout); if (validated > 0) item = getMessageAt(0); if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item); return item; } /** * Looks at the first message of this queue where the destination server * is the specified one. * The message is not removed from the queue. It should never be used during * a transaction to avoid dead-lock problems. * * @param to the unique server id. * @return the corresponding message or null if none . */ public synchronized Message getMessageTo(short to) { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) { logmon.log(BasicLevel.DEBUG, logmsg + "getFrom(" + to + ")"); cpt1 += 1; cpt2 += validated; if ((cpt1 & 0xFFFFL) == 0L) { logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated); } } Message item = null; for (int i=0; i<validated; i++) { Message msg = getMessageAt(i); if (msg.getDest() == to) { item = msg; break; } } if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item); return item; } /** * Removes the specified message from the queue if exists. * * @param msg the message to remove. */ synchronized void removeMessage(Message msg) { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "removeMessage #" + msg.getStamp()); for (int i = 0; i<validated; i++) { if (getMessageAt(i) == msg) { if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, logmsg + "removeMessage #" + msg.getStamp() + " -> " + i);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -