⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simplenetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (C) 2003 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 - France Telecom R&D * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s):  */package fr.dyade.aaa.agent;import java.io.*;import java.net.*;import java.util.Vector;import java.util.Enumeration;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.*;/** *  <code>SimpleNetwork</code> is a simple implementation of * <code>StreamNetwork</code> class with a single connection at * a time. */public class SimpleNetwork extends StreamNetwork {  /** FIFO list of all messages to be sent by the watch-dog thead. */  MessageVector sendList;  private JGroups jgroups = null;  public void setJGroups(JGroups jgroups) {    this.jgroups = jgroups;  }    void ackMsg(JGroupsAckMsg ack) {    try {      AgentServer.getTransaction().begin();      //  Deletes the processed notification      qout.remove(ack.getStamp());      ack.delete();      AgentServer.getTransaction().commit();      AgentServer.getTransaction().release();      if (this.logmon.isLoggable(BasicLevel.DEBUG))        this.logmon.log(BasicLevel.DEBUG,                        this.getName() + ", ackMsg(...) done.");    } catch (Exception exc) {      this.logmon.log(BasicLevel.FATAL,                      this.getName() + ", ackMsg unrecoverable exception",                      exc);    }  }  /**   * Creates a new network component.   */  public SimpleNetwork() {    super();  }  /** Input component */  NetServerIn netServerIn = null;  /** Output component */  NetServerOut netServerOut = null;  /**   * Causes this network component to begin execution.   */  public void start() throws IOException {    logmon.log(BasicLevel.DEBUG, getName() + ", starting");    try {      if (sendList == null)        sendList = new MessageVector(getName(),                                     AgentServer.getTransaction().isPersistent());          if (netServerIn == null)        netServerIn = new NetServerIn(getName(), logmon);      if (netServerOut == null)        netServerOut = new NetServerOut(getName(), logmon);      if (! netServerIn.isRunning()) netServerIn.start();      if (! netServerOut.isRunning()) netServerOut.start();    } catch (IOException exc) {      logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);      throw exc;    }    logmon.log(BasicLevel.DEBUG, getName() + ", started");  }  /**   * Forces the network component to stop executing.   */  public void stop() {    if (netServerIn != null) netServerIn.stop();    if (netServerOut != null) netServerOut.stop();    logmon.log(BasicLevel.DEBUG, getName() + ", stopped");  }  /**   * Tests if the network component is alive.   *   * @return	true if this <code>MessageConsumer</code> is alive; false   * 		otherwise.   */  public boolean isRunning() {    if ((netServerIn != null) && netServerIn.isRunning() &&	(netServerOut != null) && netServerOut.isRunning())      return true;    else      return false;  }  /**   * Returns a string representation of this consumer, including the   * daemon's name and status.   *   * @return	A string representation of this consumer.    */  public String toString() {    StringBuffer strbuf = new StringBuffer();    strbuf.append(super.toString()).append("\n\t");    if (netServerIn != null)      strbuf.append(netServerIn.toString()).append("\n\t");    if (netServerOut != null)      strbuf.append(netServerOut.toString()).append("\n\t");    return strbuf.toString();  }//   /**//    * Use to clean the qout of all messages to the dead node.//    *//    * @param	dead - the unique id. of dead server.//    *///   void clean(short dead) {//     Message msg = null;//     // TODO: Be careful, to the route algorithm!//     synchronized (lock) {//       for (int i=0; i<qout.size(); i++) {//         msg = (Message) qout.getMessageAt(i);//         if (msg.to.to == dead) {//           qout.removeMessageAt(i);//         }//       }//     }//   }  final class NetServerOut extends Daemon {    MessageOutputStream nos = null;    NetServerOut(String name, Logger logmon) {      super(name + ".NetServerOut");      // Overload logmon definition in Daemon      this.logmon = logmon;      this.setThreadGroup(AgentServer.getThreadGroup());    }    protected void close() {}    protected void shutdown() {}    public void run() {      int ret;      Message msg = null;      short msgto;      ServerDesc server = null;      InputStream is = null;      try {        try {          nos = new MessageOutputStream();        } catch (IOException exc) {          logmon.log(BasicLevel.FATAL,                     getName() + ", cannot start.");          return;        }        loop:	while (running) {          canStop = true;          try {            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG,                              this.getName() + ", waiting message");            msg = qout.get(WDActivationPeriod);          } catch (InterruptedException exc) {            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG,                              this.getName() + ", interrupted");            continue;          }          canStop = false;          if (! running) break;          long currentTimeMillis = System.currentTimeMillis();          if (msg != null) {            msgto = msg.getDest();                        Socket socket = null;            try {              if (this.logmon.isLoggable(BasicLevel.DEBUG))                this.logmon.log(BasicLevel.DEBUG,                                this.getName() + ", try to send message -> " +                                msg + "/" + msgto);              if ((msg.not.expiration > 0) &&                  (msg.not.expiration < currentTimeMillis)) {                throw new ExpirationExceededException();              }                            // Can throw an UnknownServerException...              server = AgentServer.getServerDesc(msgto);                            try {                if (! server.active) {                  if (this.logmon.isLoggable(BasicLevel.DEBUG))                    this.logmon.log(BasicLevel.DEBUG,                                    this.getName() + ", AgentServer#" + msgto + " is down");                  throw new ConnectException("AgentServer#" + msgto + " is down");                }                                // Open the connection.                try {                  if (this.logmon.isLoggable(BasicLevel.DEBUG))                    this.logmon.log(BasicLevel.DEBUG, this.getName() + ", try to connect");                  for (Enumeration e = server.getSockAddrs(); e.hasMoreElements();) {                    fr.dyade.aaa.util.SocketAddress sa =                       (fr.dyade.aaa.util.SocketAddress) e.nextElement();                    try {                      server.moveToFirst(sa);                      socket = createSocket(server);                    } catch (IOException ioexc) {                      this.logmon.log(BasicLevel.DEBUG,                                      this.getName() + ", connection refused with addr=" + server.getAddr()+                                      " port=" +  server.getPort() +", try next element");                      continue;                    }                    if (this.logmon.isLoggable(BasicLevel.DEBUG))                      this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connected");                    break;                  }                                    if (socket == null)                    socket = createSocket(server);                } catch (IOException exc) {                  this.logmon.log(BasicLevel.WARN,                                  this.getName() + ", connection refused", exc);                  server.active = false;                  server.last = System.currentTimeMillis();                  server.retry += 1;                  throw exc;                }                setSocketOption(socket);              } catch (IOException exc) {                this.logmon.log(BasicLevel.WARN,                                this.getName() + ", move msg in watchdog list", exc);                //  There is a connection problem, put the message in a                // waiting list.                sendList.addMessage(msg);                qout.pop();                continue;              }                            try {                send(socket, msg, currentTimeMillis);              } catch (IOException exc) {                this.logmon.log(BasicLevel.WARN,                                this.getName() + ", move msg in watchdog list", exc);                //  There is a problem during network transaction, put the                // message in waiting list in order to retry later.                sendList.addMessage(msg);                qout.pop();                continue;              }            } catch (UnknownServerException exc) {              this.logmon.log(BasicLevel.ERROR,                              this.getName() + ", can't send message: " + msg,                              exc);              // Remove the message (see below), may be we have to post an              // error notification to sender.            } catch (ExpirationExceededException exc) {              if (logmon.isLoggable(BasicLevel.DEBUG))                logmon.log(BasicLevel.DEBUG,                           getName() + ": removes expired notification " +                           msg.from + ", " + msg.not);            }            AgentServer.getTransaction().begin();            //  Suppress the processed notification from message queue,            // and deletes it.            qout.pop();            // send ack in JGroups to delete msg            if (jgroups != null)              jgroups.send(new JGroupsAckMsg(msg));            msg.delete();            msg.free();            AgentServer.getTransaction().commit();            AgentServer.getTransaction().release();          }          watchdog(currentTimeMillis);        }      } catch (Exception exc) {        this.logmon.log(BasicLevel.FATAL,                        this.getName() + ", unrecoverable exception", exc);        //  There is an unrecoverable exception during the transaction        // we must exit from server.        AgentServer.stop(false);      } finally {        finish();      }    }    /** The date of the last watchdog execution. */    private long last = 0L;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -