📄 engine.java
字号:
* Causes this engine to begin execution. * * @see stop */ public void start() { if (isRunning) return; thread = new EngineThread(this); thread.setDaemon(false); logmon.log(BasicLevel.DEBUG, getName() + " starting."); String rp = AgentServer.getProperty("Engine.recoveryPolicy"); if (rp != null) { for (int i = rpStrings.length; i-- > 0;) { if (rp.equals(rpStrings[i])) { recoveryPolicy = i; break; } } } isRunning = true; canStop = true; thread.start(); logmon.log(BasicLevel.DEBUG, getName() + " started."); } /** * Forces the engine to stop executing. * * @see start */ public void stop() { logmon.log(BasicLevel.DEBUG, getName() + ", stops."); isRunning = false; if (thread != null) { while (thread.isAlive()) { if (canStop) { if (thread.isAlive()) thread.interrupt(); } try { thread.join(1000L); } catch (InterruptedException exc) { continue; } } thread = null; } } /** * Get this engine's <code>MessageQueue</code> qin. * * @return this <code>Engine</code>'s queue. */ public MessageQueue getQueue() { return qin; } /** * Tests if the engine is alive. * * @return true if this <code>MessageConsumer</code> is alive; false * otherwise. */ public boolean isRunning() { return isRunning; } /** * Saves logical clock information to persistent storage. */ public void save() throws IOException { if (modified) { stampBuf[0] = (byte)((stamp >>> 24) & 0xFF); stampBuf[1] = (byte)((stamp >>> 16) & 0xFF); stampBuf[2] = (byte)((stamp >>> 8) & 0xFF); stampBuf[3] = (byte)(stamp & 0xFF); AgentServer.getTransaction().saveByteArray(stampBuf, getName()); modified = false; } } /** * Restores logical clock information from persistent storage. */ public void restore() throws Exception { stampBuf = AgentServer.getTransaction().loadByteArray(getName()); if (stampBuf == null) { stamp = 0; stampBuf = new byte[4]; modified = true; } else { stamp = ((stampBuf[0] & 0xFF) << 24) + ((stampBuf[1] & 0xFF) << 16) + ((stampBuf[2] & 0xFF) << 8) + (stampBuf[3] & 0xFF); modified = false; } } /** * This operation always throws an IllegalStateException. */ public void delete() throws IllegalStateException { throw new IllegalStateException(); } protected final int getStamp() { return stamp; } protected final void setStamp(int stamp) { modified = true; this.stamp = stamp; } protected final void stamp(Message msg) { modified = true; msg.source = AgentServer.getServerId(); msg.dest = AgentServer.getServerId(); msg.stamp = ++stamp; } /** * Adds a message in "ready to deliver" list. This method allocates a * new time stamp to the message ; be Careful, changing the stamp imply * the filename change too. */ public void post(Message msg) throws Exception { if ((msg.not.expiration > 0) && (msg.not.expiration < System.currentTimeMillis())) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); return; } if (msg.isPersistent()) { stamp(msg); msg.save(); } qin.push(msg); } protected boolean needToBeCommited = false; protected long timeout = Long.MAX_VALUE; protected void onTimeOut() {} /** * Main loop of agent server <code>Engine</code>. */ public void run() { try { main_loop: while (isRunning) { agent = null; canStop = true; // Get a notification, then execute the right reaction. try { msg = (Message) qin.get(timeout); if (msg == null) { onTimeOut(); continue; } } catch (InterruptedException exc) { continue; } canStop = false; if (! isRunning) break; if ((msg.not.expiration <= 0) || (msg.not.expiration >= System.currentTimeMillis())) { // The message is valid, try to load the destination agent try { agent = load(msg.to); } catch (UnknownAgentException exc) { // The destination agent don't exists, send an error // notification to sending agent. logmon.log(BasicLevel.ERROR, getName() + ": Unknown agent, " + msg.to + ".react(" + msg.from + ", " + msg.not + ")"); agent = null; push(AgentId.localId, msg.from, new UnknownAgent(msg.to, msg.not)); } catch (Exception exc) { // Can't load agent then send an error notification // to sending agent. logmon.log(BasicLevel.ERROR, getName() + ": Can't load agent, " + msg.to + ".react(" + msg.from + ", " + msg.not + ")", exc); agent = null; // Stop the AgentServer AgentServer.stop(false); break main_loop; } } else { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); } if (agent != null) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": " + agent + ".react(" + msg.from + ", " + msg.not + ")"); try { agent.react(msg.from, msg.not); } catch (Exception exc) { logmon.log(BasicLevel.ERROR, getName() + ": Uncaught exception during react, " + agent + ".react(" + msg.from + ", " + msg.not + ")", exc); switch (recoveryPolicy) { case RP_EXC_NOT: default: // In case of unrecoverable error during the reaction we have // to rollback. abort(exc); // then continue. continue; case RP_EXIT: // Stop the AgentServer AgentServer.stop(false); break main_loop; } } } // Commit all changes then continue. commit(); } } catch (Throwable exc) { // There is an unrecoverable exception during the transaction // we must exit from server. logmon.log(BasicLevel.FATAL, getName() + ": Fatal error", exc); canStop = false; // Stop the AgentServer AgentServer.stop(false); } finally { terminate(); logmon.log(BasicLevel.DEBUG, getName() + " stopped."); } } /** * Commit the agent reaction in case of rigth termination:<ul> * <li>suppress the processed notification from message queue, * then deletes it ; * <li>push all new notifications in qin and qout, and saves them ; * <li>saves the agent state ; * <li>then commit the transaction to validate all changes. * </ul> */ void commit() throws Exception { AgentServer.getTransaction().begin(); // Suppress the processed notification from message queue .. qin.pop(); // .. then deletes it .. msg.delete(); // .. and frees it. msg.free(); // Post all notifications temporary keeped in mq in the rigth consumers, // then saves changes. dispatch(); // Saves the agent state then commit the transaction. if (agent != null) agent.save(); AgentServer.getTransaction().commit(); // The transaction has commited, then validate all messages. Channel.validate(); AgentServer.getTransaction().release(); } /** * Abort the agent reaction in case of error during execution. In case * of unrecoverable error during the reaction we have to rollback:<ul> * <li>reload the previous state of agent ; * <li>remove the failed notification ; * <li>clean the Channel queue of all pushed notifications ; * <li>send an error notification to the sender ; * <li>then commit the transaction to validate all changes. * </ul> */ void abort(Exception exc) throws Exception { AgentServer.getTransaction().begin(); // Reload the state of agent. try { agent = reload(msg.to); } catch (Exception exc2) { logmon.log(BasicLevel.ERROR, getName() + ", can't reload Agent" + msg.to, exc2); throw new Exception("Can't reload Agent" + msg.to); } // Remove the failed notification .. qin.pop(); // .. then deletes it .. msg.delete(); // .. and frees it. msg.free(); // Clean the Channel queue of all pushed notifications. clean(); // Send an error notification to client agent. push(AgentId.localId, msg.from, new ExceptionNotification(msg.to, msg.not, exc)); dispatch(); AgentServer.getTransaction().commit(); // The transaction has commited, then validate all messages. Channel.validate(); AgentServer.getTransaction().release(); } /** * Returns a string representation of this engine. * * @return A string representation of this engine. */ public String toString() { StringBuffer strbuf = new StringBuffer(); strbuf.append('(').append(super.toString()); strbuf.append(",name=").append(getName()); strbuf.append(",running=").append(isRunning()); strbuf.append(",agent=").append(agent).append(')'); return strbuf.toString(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -