📄 simplenetwork.java
字号:
/* * Copyright (C) 2003 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 - France Telecom R&D * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s): */package fr.dyade.aaa.agent;import java.io.*;import java.net.*;import java.util.Vector;import java.util.Enumeration;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.*;/** * <code>SimpleNetwork</code> is a simple implementation of * <code>StreamNetwork</code> class with a single connection at * a time. */public class SimpleNetwork extends StreamNetwork { /** FIFO list of all messages to be sent by the watch-dog thead. */ MessageVector sendList; private JGroups jgroups = null; public void setJGroups(JGroups jgroups) { this.jgroups = jgroups; } void ackMsg(JGroupsAckMsg ack) { try { AgentServer.getTransaction().begin(); // Deletes the processed notification qout.remove(ack.getStamp()); ack.delete(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", ackMsg(...) done."); } catch (Exception exc) { this.logmon.log(BasicLevel.FATAL, this.getName() + ", ackMsg unrecoverable exception", exc); } } /** * Creates a new network component. */ public SimpleNetwork() { super(); } /** Input component */ NetServerIn netServerIn = null; /** Output component */ NetServerOut netServerOut = null; /** * Causes this network component to begin execution. */ public void start() throws IOException { logmon.log(BasicLevel.DEBUG, getName() + ", starting"); try { if (sendList == null) sendList = new MessageVector(getName(), AgentServer.getTransaction().isPersistent()); if (netServerIn == null) netServerIn = new NetServerIn(getName(), logmon); if (netServerOut == null) netServerOut = new NetServerOut(getName(), logmon); if (! netServerIn.isRunning()) netServerIn.start(); if (! netServerOut.isRunning()) netServerOut.start(); } catch (IOException exc) { logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); throw exc; } logmon.log(BasicLevel.DEBUG, getName() + ", started"); } /** * Forces the network component to stop executing. */ public void stop() { if (netServerIn != null) netServerIn.stop(); if (netServerOut != null) netServerOut.stop(); logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); } /** * Tests if the network component is alive. * * @return true if this <code>MessageConsumer</code> is alive; false * otherwise. */ public boolean isRunning() { if ((netServerIn != null) && netServerIn.isRunning() && (netServerOut != null) && netServerOut.isRunning()) return true; else return false; } /** * Returns a string representation of this consumer, including the * daemon's name and status. * * @return A string representation of this consumer. */ public String toString() { StringBuffer strbuf = new StringBuffer(); strbuf.append(super.toString()).append("\n\t"); if (netServerIn != null) strbuf.append(netServerIn.toString()).append("\n\t"); if (netServerOut != null) strbuf.append(netServerOut.toString()).append("\n\t"); return strbuf.toString(); }// /**// * Use to clean the qout of all messages to the dead node.// *// * @param dead - the unique id. of dead server.// */// void clean(short dead) {// Message msg = null;// // TODO: Be careful, to the route algorithm!// synchronized (lock) {// for (int i=0; i<qout.size(); i++) {// msg = (Message) qout.getMessageAt(i);// if (msg.to.to == dead) {// qout.removeMessageAt(i);// }// }// }// } final class NetServerOut extends Daemon { MessageOutputStream nos = null; NetServerOut(String name, Logger logmon) { super(name + ".NetServerOut"); // Overload logmon definition in Daemon this.logmon = logmon; this.setThreadGroup(AgentServer.getThreadGroup()); } protected void close() {} protected void shutdown() {} public void run() { int ret; Message msg = null; short msgto; ServerDesc server = null; InputStream is = null; try { try { nos = new MessageOutputStream(); } catch (IOException exc) { logmon.log(BasicLevel.FATAL, getName() + ", cannot start."); return; } loop: while (running) { canStop = true; try { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", waiting message"); msg = qout.get(WDActivationPeriod); } catch (InterruptedException exc) { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", interrupted"); continue; } canStop = false; if (! running) break; long currentTimeMillis = System.currentTimeMillis(); if (msg != null) { msgto = msg.getDest(); Socket socket = null; try { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", try to send message -> " + msg + "/" + msgto); if ((msg.not.expiration > 0) && (msg.not.expiration < currentTimeMillis)) { throw new ExpirationExceededException(); } // Can throw an UnknownServerException... server = AgentServer.getServerDesc(msgto); try { if (! server.active) { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", AgentServer#" + msgto + " is down"); throw new ConnectException("AgentServer#" + msgto + " is down"); } // Open the connection. try { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", try to connect"); for (Enumeration e = server.getSockAddrs(); e.hasMoreElements();) { fr.dyade.aaa.util.SocketAddress sa = (fr.dyade.aaa.util.SocketAddress) e.nextElement(); try { server.moveToFirst(sa); socket = createSocket(server); } catch (IOException ioexc) { this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connection refused with addr=" + server.getAddr()+ " port=" + server.getPort() +", try next element"); continue; } if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connected"); break; } if (socket == null) socket = createSocket(server); } catch (IOException exc) { this.logmon.log(BasicLevel.WARN, this.getName() + ", connection refused", exc); server.active = false; server.last = System.currentTimeMillis(); server.retry += 1; throw exc; } setSocketOption(socket); } catch (IOException exc) { this.logmon.log(BasicLevel.WARN, this.getName() + ", move msg in watchdog list", exc); // There is a connection problem, put the message in a // waiting list. sendList.addMessage(msg); qout.pop(); continue; } try { send(socket, msg, currentTimeMillis); } catch (IOException exc) { this.logmon.log(BasicLevel.WARN, this.getName() + ", move msg in watchdog list", exc); // There is a problem during network transaction, put the // message in waiting list in order to retry later. sendList.addMessage(msg); qout.pop(); continue; } } catch (UnknownServerException exc) { this.logmon.log(BasicLevel.ERROR, this.getName() + ", can't send message: " + msg, exc); // Remove the message (see below), may be we have to post an // error notification to sender. } catch (ExpirationExceededException exc) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); } AgentServer.getTransaction().begin(); // Suppress the processed notification from message queue, // and deletes it. qout.pop(); // send ack in JGroups to delete msg if (jgroups != null) jgroups.send(new JGroupsAckMsg(msg)); msg.delete(); msg.free(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); } watchdog(currentTimeMillis); } } catch (Exception exc) { this.logmon.log(BasicLevel.FATAL, this.getName() + ", unrecoverable exception", exc); // There is an unrecoverable exception during the transaction // we must exit from server. AgentServer.stop(false); } finally { finish(); } } /** The date of the last watchdog execution. */ private long last = 0L;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -