⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 engine.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 BULL * Copyright (C) 1996 - 2000 INRIA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. */package fr.dyade.aaa.agent;import java.io.IOException;import java.util.Hashtable;import java.util.Enumeration;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.*;class EngineThread extends Thread {  Engine engine = null;  EngineThread(Engine engine) {    super(AgentServer.getThreadGroup(), engine, engine.getName());    this.engine = engine;  }}/** * The <code>Engine</code> class provides multiprogramming of agents. It * realizes the program loop which successively gets the notifications from * the message queue and calls the relevant reaction function member of the * target agent. The engine's basic behaviour is: * <p><blockquote><pre> * While (true) { *   // get next message in channel *   Message msg = qin.get(); *   // get the agent to process event *   Agent agent = load(msg.to); *   // execute relevant reaction, all notification sent during this *   // reaction is inserted into persistant queue in order to processed *   // by the channel. *   agent.react(msg.from, msg.not); *   // save changes, then commit. *   &lt;BEGIN TRANSACTION&gt; *   qin.pop(); *   channel.dispatch(); *   agent.save(); *   &lt;COMMIT TRANSACTION&gt; * } * </pre></blockquote> * <p> * The <code>Engine</code> class ensures the atomic handling of an agent * reacting to a notification: * <ul> * <li>if the reaction completes, a COMMIT ensures all changes related to * the reaction are committed (state change of the agent, notifications * signaled during the reaction, deletion of the handled notification); * <li>if anything goes wrong during the reaction, a ROLLBACK undoes the * changes; depending on the error kind it may be necessary to execute * additional operations to resynchronize the database and the memory * objects, and to allow the main program to continue. * </ul> * <p><hr> * <b>Handling errors.</b><p> * Two types of errors may occur: errors of first type are detected in the * source code and signaled by an <code>Exception</code>; serious errors lead * to an <code>Error</code> being raised then the engine exits. In the first * case the exception may be handled at any level, even partially. Most of * them are signaled up to the engine loop. Two cases are then distinguished * depending on the recovery policy:<ul> * <li>if <code>recoveryPolicy</code> is set to <code>RP_EXC_NOT</code> * (default value) then the agent state and the message queue are restored * (ROLLBACK); an <code>ExceptionNotification</code> notification is sent * to the sender and the engine may then proceed with next notification; * <li>if <code>recoveryPolicy</code> is set to <code>RP_EXIT</code> the engine * stops the agent server. * </ul> */class Engine implements Runnable, MessageConsumer, EngineMBean {  /**   * Queue of messages to be delivered to local agents.   */   protected MessageQueue qin;  /**   * Boolean variable used to stop the engine properly. The engine tests   * this variable between each reaction, and stops if it is false.   */  protected volatile boolean isRunning;  /**   * Boolean variable used to stop the engine properly. If this variable   * is true then the engine is waiting and it can interupted, else it   * handles a notification and it will exit after (the engine tests the   * <code><a href="#isRunning">isRunning</a></code> variable between   * each reaction)   */  protected volatile boolean canStop;  /** Logical timestamp information for messages in "local" domain. */  private int stamp;  /** Buffer used to optimise */  private byte[] stampBuf = null;  /** True if the timestamp is modified since last save. */  private boolean modified = false;  /** This table is used to maintain a list of agents already in memory   * using the AgentId as primary key.   */  Hashtable agents;  /** Virtual time counter use in FIFO swap-in/swap-out mechanisms. */  long now = 0;  /** Maximum number of memory loaded agents. */  int NbMaxAgents = 100;  /**   * Returns the number of agent's reaction since last boot.   *   * @return	the number of agent's reaction since last boot   */  public long getNbReactions() {    return now;  }  /**   * Returns the maximum number of agents loaded in memory.   *   * @return	the maximum number of agents loaded in memory   */  public int getNbMaxAgents() {    return NbMaxAgents;  }  /**   * Sets the maximum number of agents that can be loaded simultaneously   * in memory.   *   * @parama NbMaxAgents	the maximum number of agents   */  public void setNbMaxAgents(int NbMaxAgents) {    this.NbMaxAgents = NbMaxAgents;  }  /**   * Returns the number of agents actually loaded in memory.   *   * @return	the maximum number of agents actually loaded in memory   */  public int getNbAgents() {    return agents.size();  }  /**   * Gets the number of messages posted to this engine since creation.   *   *  return	the number of messages.   */  public int getNbMessages() {    return stamp;  }  /**   * Gets the number of waiting messages in this engine.   *   *  return	the number of waiting messages.   */  public int getNbWaitingMessages() {    return qin.size();  }  /** Vector containing id's of all fixed agents. */  Vector fixedAgentIdList = null;  /**   * Returns the number of fixed agents.   *   * @return	the number of fixed agents   */  public int getNbFixedAgents() {    return fixedAgentIdList.size();  }  /**   * The current agent running.   */   Agent agent = null;  /**   * The message in progress.   */   Message msg = null;  /**   * The active component of this engine.   */   EngineThread thread = null;  /**   * Send <code>ExceptionNotification</code> notification in case of exception   * in agent specific code.   * Constant value for the <code>recoveryPolicy</code> variable.   */  static final int RP_EXC_NOT = 0;  /**   * Stop agent server in case of exception in agent specific code.   * Constant value for the <code>recoveryPolicy</code> variable.   */  static final int RP_EXIT = 1;  /**   * String representations of <code>RP_*</code> constant values   * for the <code>recoveryPolicy</code> variable.   */  static final String[] rpStrings = {    "notification",    "exit"  };  /**   * recovery policy in case of exception in agent specific code.   * Default value is <code>RP_EXC_NOT</code>.   */  int recoveryPolicy = RP_EXC_NOT;  private String name;  /**   * Returns this <code>Engine</code>'s name.   *   * @return this <code>Engine</code>'s name.   */  public final String getName() {    return name;  }  /**   * Returns the corresponding domain's name.   *   * @return this domain's name.   */  public final String getDomainName() {    return "engine";  }  /**   * Creates a new instance of Engine (real class depends of server type).   *   * @return		the corresponding <code>engine</code>'s instance.   */  static Engine newInstance() throws Exception {    String cname = "fr.dyade.aaa.agent.Engine";    cname = AgentServer.getProperty("Engine", cname);    Class eclass = Class.forName(cname);    return (Engine) eclass.newInstance();  }  protected Queue mq;  /**   * Push a new message in temporary queue until the end of current reaction.   * As this method is only  called by engine's thread it does not need to be   * synchronized.   */  final void push(AgentId from,                  AgentId to,                  Notification not) {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG,                 getName() + ", push(" + from + ", " + to + ", " + not + ")");    if ((to == null) || to.isNullId())      return;        mq.push(Message.alloc(from, to, not));  }  /**   * Dispatch messages between the <a href="MessageConsumer.html">   * <code>MessageConsumer</code></a>: <a href="Engine.html">   * <code>Engine</code></a> component and <a href="Network.html">   * <code>Network</code></a> components.<p>   * Handle persistent information in respect with engine transaction.   * <p><hr>   * Be careful, this method must only be used during a transaction in   * order to ensure the mutual exclusion.   *   * @exception IOException	error when accessing the local persistent   *				storage.   */  final void dispatch() throws Exception {    Message msg = null;    while (! mq.isEmpty()) {      try {	msg = (Message) mq.get();      } catch (InterruptedException exc) {	continue;      }      if (msg.from == null) msg.from = AgentId.localId;      Channel.post(msg);      mq.pop();    }    Channel.save();  }  /**   * Cleans the Channel queue of all pushed notifications.   * <p><hr>   * Be careful, this method must only be used during a transaction in   * order to ensure the mutual exclusion.   */  final void clean() {    mq.removeAllElements();  }  protected Logger logmon = null;  /**   * Initializes a new <code>Engine</code> object (can only be used by   * subclasses).   */  protected Engine() throws Exception {    name = "Engine#" + AgentServer.getServerId();    // Get the logging monitor from current server MonologLoggerFactory    logmon = Debug.getLogger(Debug.A3Engine +                             ".#" + AgentServer.getServerId());    logmon.log(BasicLevel.DEBUG,               getName() + " created [" + getClass().getName() + "].");    NbMaxAgents = Integer.getInteger("NbMaxAgents", NbMaxAgents).intValue();    qin = new MessageVector(name, AgentServer.getTransaction().isPersistent());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -