📄 engine.java
字号:
/* * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 BULL * Copyright (C) 1996 - 2000 INRIA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. */package fr.dyade.aaa.agent;import java.io.IOException;import java.util.Hashtable;import java.util.Enumeration;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.*;class EngineThread extends Thread { Engine engine = null; EngineThread(Engine engine) { super(AgentServer.getThreadGroup(), engine, engine.getName()); this.engine = engine; }}/** * The <code>Engine</code> class provides multiprogramming of agents. It * realizes the program loop which successively gets the notifications from * the message queue and calls the relevant reaction function member of the * target agent. The engine's basic behaviour is: * <p><blockquote><pre> * While (true) { * // get next message in channel * Message msg = qin.get(); * // get the agent to process event * Agent agent = load(msg.to); * // execute relevant reaction, all notification sent during this * // reaction is inserted into persistant queue in order to processed * // by the channel. * agent.react(msg.from, msg.not); * // save changes, then commit. * <BEGIN TRANSACTION> * qin.pop(); * channel.dispatch(); * agent.save(); * <COMMIT TRANSACTION> * } * </pre></blockquote> * <p> * The <code>Engine</code> class ensures the atomic handling of an agent * reacting to a notification: * <ul> * <li>if the reaction completes, a COMMIT ensures all changes related to * the reaction are committed (state change of the agent, notifications * signaled during the reaction, deletion of the handled notification); * <li>if anything goes wrong during the reaction, a ROLLBACK undoes the * changes; depending on the error kind it may be necessary to execute * additional operations to resynchronize the database and the memory * objects, and to allow the main program to continue. * </ul> * <p><hr> * <b>Handling errors.</b><p> * Two types of errors may occur: errors of first type are detected in the * source code and signaled by an <code>Exception</code>; serious errors lead * to an <code>Error</code> being raised then the engine exits. In the first * case the exception may be handled at any level, even partially. Most of * them are signaled up to the engine loop. Two cases are then distinguished * depending on the recovery policy:<ul> * <li>if <code>recoveryPolicy</code> is set to <code>RP_EXC_NOT</code> * (default value) then the agent state and the message queue are restored * (ROLLBACK); an <code>ExceptionNotification</code> notification is sent * to the sender and the engine may then proceed with next notification; * <li>if <code>recoveryPolicy</code> is set to <code>RP_EXIT</code> the engine * stops the agent server. * </ul> */class Engine implements Runnable, MessageConsumer, EngineMBean { /** * Queue of messages to be delivered to local agents. */ protected MessageQueue qin; /** * Boolean variable used to stop the engine properly. The engine tests * this variable between each reaction, and stops if it is false. */ protected volatile boolean isRunning; /** * Boolean variable used to stop the engine properly. If this variable * is true then the engine is waiting and it can interupted, else it * handles a notification and it will exit after (the engine tests the * <code><a href="#isRunning">isRunning</a></code> variable between * each reaction) */ protected volatile boolean canStop; /** Logical timestamp information for messages in "local" domain. */ private int stamp; /** Buffer used to optimise */ private byte[] stampBuf = null; /** True if the timestamp is modified since last save. */ private boolean modified = false; /** This table is used to maintain a list of agents already in memory * using the AgentId as primary key. */ Hashtable agents; /** Virtual time counter use in FIFO swap-in/swap-out mechanisms. */ long now = 0; /** Maximum number of memory loaded agents. */ int NbMaxAgents = 100; /** * Returns the number of agent's reaction since last boot. * * @return the number of agent's reaction since last boot */ public long getNbReactions() { return now; } /** * Returns the maximum number of agents loaded in memory. * * @return the maximum number of agents loaded in memory */ public int getNbMaxAgents() { return NbMaxAgents; } /** * Sets the maximum number of agents that can be loaded simultaneously * in memory. * * @parama NbMaxAgents the maximum number of agents */ public void setNbMaxAgents(int NbMaxAgents) { this.NbMaxAgents = NbMaxAgents; } /** * Returns the number of agents actually loaded in memory. * * @return the maximum number of agents actually loaded in memory */ public int getNbAgents() { return agents.size(); } /** * Gets the number of messages posted to this engine since creation. * * return the number of messages. */ public int getNbMessages() { return stamp; } /** * Gets the number of waiting messages in this engine. * * return the number of waiting messages. */ public int getNbWaitingMessages() { return qin.size(); } /** Vector containing id's of all fixed agents. */ Vector fixedAgentIdList = null; /** * Returns the number of fixed agents. * * @return the number of fixed agents */ public int getNbFixedAgents() { return fixedAgentIdList.size(); } /** * The current agent running. */ Agent agent = null; /** * The message in progress. */ Message msg = null; /** * The active component of this engine. */ EngineThread thread = null; /** * Send <code>ExceptionNotification</code> notification in case of exception * in agent specific code. * Constant value for the <code>recoveryPolicy</code> variable. */ static final int RP_EXC_NOT = 0; /** * Stop agent server in case of exception in agent specific code. * Constant value for the <code>recoveryPolicy</code> variable. */ static final int RP_EXIT = 1; /** * String representations of <code>RP_*</code> constant values * for the <code>recoveryPolicy</code> variable. */ static final String[] rpStrings = { "notification", "exit" }; /** * recovery policy in case of exception in agent specific code. * Default value is <code>RP_EXC_NOT</code>. */ int recoveryPolicy = RP_EXC_NOT; private String name; /** * Returns this <code>Engine</code>'s name. * * @return this <code>Engine</code>'s name. */ public final String getName() { return name; } /** * Returns the corresponding domain's name. * * @return this domain's name. */ public final String getDomainName() { return "engine"; } /** * Creates a new instance of Engine (real class depends of server type). * * @return the corresponding <code>engine</code>'s instance. */ static Engine newInstance() throws Exception { String cname = "fr.dyade.aaa.agent.Engine"; cname = AgentServer.getProperty("Engine", cname); Class eclass = Class.forName(cname); return (Engine) eclass.newInstance(); } protected Queue mq; /** * Push a new message in temporary queue until the end of current reaction. * As this method is only called by engine's thread it does not need to be * synchronized. */ final void push(AgentId from, AgentId to, Notification not) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", push(" + from + ", " + to + ", " + not + ")"); if ((to == null) || to.isNullId()) return; mq.push(Message.alloc(from, to, not)); } /** * Dispatch messages between the <a href="MessageConsumer.html"> * <code>MessageConsumer</code></a>: <a href="Engine.html"> * <code>Engine</code></a> component and <a href="Network.html"> * <code>Network</code></a> components.<p> * Handle persistent information in respect with engine transaction. * <p><hr> * Be careful, this method must only be used during a transaction in * order to ensure the mutual exclusion. * * @exception IOException error when accessing the local persistent * storage. */ final void dispatch() throws Exception { Message msg = null; while (! mq.isEmpty()) { try { msg = (Message) mq.get(); } catch (InterruptedException exc) { continue; } if (msg.from == null) msg.from = AgentId.localId; Channel.post(msg); mq.pop(); } Channel.save(); } /** * Cleans the Channel queue of all pushed notifications. * <p><hr> * Be careful, this method must only be used during a transaction in * order to ensure the mutual exclusion. */ final void clean() { mq.removeAllElements(); } protected Logger logmon = null; /** * Initializes a new <code>Engine</code> object (can only be used by * subclasses). */ protected Engine() throws Exception { name = "Engine#" + AgentServer.getServerId(); // Get the logging monitor from current server MonologLoggerFactory logmon = Debug.getLogger(Debug.A3Engine + ".#" + AgentServer.getServerId()); logmon.log(BasicLevel.DEBUG, getName() + " created [" + getClass().getName() + "]."); NbMaxAgents = Integer.getInteger("NbMaxAgents", NbMaxAgents).intValue(); qin = new MessageVector(name, AgentServer.getTransaction().isPersistent());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -