⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 engine.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
   * Causes this engine to begin execution.   *   * @see stop   */  public void start() {    if (isRunning) return;    thread = new EngineThread(this);    thread.setDaemon(false);    logmon.log(BasicLevel.DEBUG, getName() + " starting.");    String rp = AgentServer.getProperty("Engine.recoveryPolicy");    if (rp != null) {      for (int i = rpStrings.length; i-- > 0;) {	if (rp.equals(rpStrings[i])) {	  recoveryPolicy = i;	  break;	}      }    }    isRunning = true;    canStop = true;    thread.start();    logmon.log(BasicLevel.DEBUG, getName() + " started.");  }    /**   * Forces the engine to stop executing.   *   * @see start   */  public void stop() {    logmon.log(BasicLevel.DEBUG, getName() + ", stops.");    isRunning = false;    if (thread != null) {      while (thread.isAlive()) {        if (canStop) {          if (thread.isAlive())            thread.interrupt();        }        try {          thread.join(1000L);        } catch (InterruptedException exc) {          continue;        }      }      thread = null;    }  }  /**   * Get this engine's <code>MessageQueue</code> qin.   *   * @return this <code>Engine</code>'s queue.   */  public MessageQueue getQueue() {    return qin;  }  /**   * Tests if the engine is alive.   *   * @return	true if this <code>MessageConsumer</code> is alive; false   * 		otherwise.   */  public boolean isRunning() {    return isRunning;  }  /**   * Saves logical clock information to persistent storage.   */  public void save() throws IOException {    if (modified) {      stampBuf[0] = (byte)((stamp >>> 24) & 0xFF);      stampBuf[1] = (byte)((stamp >>> 16) & 0xFF);      stampBuf[2] = (byte)((stamp >>>  8) & 0xFF);      stampBuf[3] = (byte)(stamp & 0xFF);      AgentServer.getTransaction().saveByteArray(stampBuf, getName());      modified = false;    }  }  /**   * Restores logical clock information from persistent storage.   */  public void restore() throws Exception {    stampBuf = AgentServer.getTransaction().loadByteArray(getName());    if (stampBuf == null) {      stamp = 0;      stampBuf = new byte[4];      modified = true;    } else {      stamp = ((stampBuf[0] & 0xFF) << 24) +        ((stampBuf[1] & 0xFF) << 16) +        ((stampBuf[2] & 0xFF) <<  8) +        (stampBuf[3] & 0xFF);      modified = false;    }  }  /**   * This operation always throws an IllegalStateException.   */  public void delete() throws IllegalStateException {    throw new IllegalStateException();  }  protected final int getStamp() {    return stamp;  }  protected final void setStamp(int stamp) {    modified = true;    this.stamp = stamp;  }  protected final void stamp(Message msg) {    modified = true;    msg.source = AgentServer.getServerId();    msg.dest = AgentServer.getServerId();    msg.stamp = ++stamp;  }  /**   *  Adds a message in "ready to deliver" list. This method allocates a   * new time stamp to the message ; be Careful, changing the stamp imply   * the filename change too.   */  public void post(Message msg) throws Exception {    if ((msg.not.expiration > 0) &&        (msg.not.expiration < System.currentTimeMillis())) {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG,                   getName() + ": removes expired notification " +                   msg.from + ", " + msg.not);      return;    }          if (msg.isPersistent()) {      stamp(msg);      msg.save();    }    qin.push(msg);  }  protected boolean needToBeCommited = false;  protected long timeout = Long.MAX_VALUE;  protected void onTimeOut() {}  /**   * Main loop of agent server <code>Engine</code>.   */  public void run() {    try {      main_loop:      while (isRunning) {	agent = null;	canStop = true;	// Get a notification, then execute the right reaction.	try {	  msg = (Message) qin.get(timeout);          if (msg == null) {            onTimeOut();            continue;          }	} catch (InterruptedException exc) {	  continue;	}		canStop = false;	if (! isRunning) break;        if ((msg.not.expiration <= 0) ||            (msg.not.expiration >= System.currentTimeMillis())) {          // The message is valid, try to load the destination agent          try {            agent = load(msg.to);          } catch (UnknownAgentException exc) {            //  The destination agent don't exists, send an error            // notification to sending agent.            logmon.log(BasicLevel.ERROR,                       getName() + ": Unknown agent, " + msg.to + ".react(" +                       msg.from + ", " + msg.not + ")");            agent = null;            push(AgentId.localId,                 msg.from,                 new UnknownAgent(msg.to, msg.not));          } catch (Exception exc) {            //  Can't load agent then send an error notification            // to sending agent.            logmon.log(BasicLevel.ERROR,                       getName() + ": Can't load agent, " + msg.to + ".react(" +                       msg.from + ", " + msg.not + ")",                       exc);            agent = null;            // Stop the AgentServer            AgentServer.stop(false);            break main_loop;          }        } else {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       getName() + ": removes expired notification " +                       msg.from + ", " + msg.not);        }	if (agent != null) {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       getName() + ": " + agent + ".react(" +                       msg.from + ", " + msg.not + ")");	  try {            agent.react(msg.from, msg.not);          } catch (Exception exc) {            logmon.log(BasicLevel.ERROR,                       getName() + ": Uncaught exception during react, " +                       agent + ".react(" + msg.from + ", " + msg.not + ")",                       exc);	    switch (recoveryPolicy) {	    case RP_EXC_NOT:	    default:	      // In case of unrecoverable error during the reaction we have	      // to rollback.	      abort(exc);	      // then continue.	      continue;	    case RP_EXIT:              // Stop the AgentServer              AgentServer.stop(false);	      break main_loop;	    }	  }	}	// Commit all changes then continue.	commit();      }    } catch (Throwable exc) {      //  There is an unrecoverable exception during the transaction      // we must exit from server.      logmon.log(BasicLevel.FATAL,                 getName() + ": Fatal error",                 exc);      canStop = false;      // Stop the AgentServer      AgentServer.stop(false);    } finally {      terminate();      logmon.log(BasicLevel.DEBUG, getName() + " stopped.");    }  }  /**   * Commit the agent reaction in case of rigth termination:<ul>   * <li>suppress the processed notification from message queue,   * then deletes it ;   * <li>push all new notifications in qin and qout, and saves them ;   * <li>saves the agent state ;   * <li>then commit the transaction to validate all changes.   * </ul>   */  void commit() throws Exception {    AgentServer.getTransaction().begin();    // Suppress the processed notification from message queue ..    qin.pop();    // .. then deletes it ..    msg.delete();    // .. and frees it.    msg.free();    // Post all notifications temporary keeped in mq in the rigth consumers,    // then saves changes.    dispatch();    // Saves the agent state then commit the transaction.    if (agent != null) agent.save();    AgentServer.getTransaction().commit();    // The transaction has commited, then validate all messages.    Channel.validate();    AgentServer.getTransaction().release();  }  /**   * Abort the agent reaction in case of error during execution. In case   * of unrecoverable error during the reaction we have to rollback:<ul>   * <li>reload the previous state of agent ;   * <li>remove the failed notification ;   * <li>clean the Channel queue of all pushed notifications ;   * <li>send an error notification to the sender ;   * <li>then commit the transaction to validate all changes.   * </ul>   */  void abort(Exception exc) throws Exception {    AgentServer.getTransaction().begin();    // Reload the state of agent.    try {      agent = reload(msg.to);    } catch (Exception exc2) {      logmon.log(BasicLevel.ERROR,                 getName() + ", can't reload Agent" + msg.to, exc2);      throw new Exception("Can't reload Agent" + msg.to);    }    // Remove the failed notification ..    qin.pop();    // .. then deletes it ..    msg.delete();    // .. and frees it.    msg.free();    // Clean the Channel queue of all pushed notifications.    clean();    // Send an error notification to client agent.    push(AgentId.localId,         msg.from,         new ExceptionNotification(msg.to, msg.not, exc));    dispatch();    AgentServer.getTransaction().commit();    // The transaction has commited, then validate all messages.    Channel.validate();    AgentServer.getTransaction().release();  }  /**   * Returns a string representation of this engine.    *   * @return	A string representation of this engine.   */  public String toString() {    StringBuffer strbuf = new StringBuffer();    strbuf.append('(').append(super.toString());    strbuf.append(",name=").append(getName());    strbuf.append(",running=").append(isRunning());    strbuf.append(",agent=").append(agent).append(')');    return strbuf.toString();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -