⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ngnetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (C) 2003 - 2005 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. */package fr.dyade.aaa.agent;import java.io.*;import java.net.*;import java.util.*;import java.nio.*;import java.nio.channels.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.Daemon;/** * <code>NGNetwork</code> is a new implementation of <code>Network</code> * class using nio package. */public class NGNetwork extends StreamNetwork {  final static int Kb = 1024;  final static int Mb = 1024 * Kb;  final static int SO_BUFSIZE = 64 * Kb;  Selector selector = null;  Dispatcher dispatcher = null;  NetServer dmon[] = null;  // Be careful, NbNetServer>1 cause malfunctioning (slowness, etc.)  final static int NbNetServer = 1;  CnxHandler[] handlers = null;  /**   * Creates a new network component.   */  public NGNetwork() {    super();  }  /**   * Initializes a new network component. This method is used in order to   * easily creates and configure a Network component from a class name.   * So we can use the <code>Class.newInstance()</code> method for create   * (whitout any parameter) the component, then we can initialize it with   * this method.<br>   * This method initializes the logical clock for the domain.   *   * @param name	The domain name.   * @param port	The listen port.   * @param servers	The list of servers directly accessible from this   *			network interface.   */  public void init(String name, int port, short[] servers) throws Exception {    super.init(name, port, servers);    // Creates a connection handler for each domain's server.    handlers = new CnxHandler[servers.length];    for (int i=0; i<servers.length; i++) {      if (servers[i] != AgentServer.getServerId())        handlers[i] = new CnxHandler(getName(), servers[i]);    }  }  ServerSocketChannel listen = null;  void open() throws IOException {    // Create a blocking server socket channel on port    listen = ServerSocketChannel.open();    listen.configureBlocking(false);    listen.socket().bind(new InetSocketAddress(port));    // Register channels with selector    listen.register(selector, SelectionKey.OP_ACCEPT);  }  void close() {    try {      listen.close();    } catch (Exception exc) {}    listen = null;  }  /**   * Causes this network component to begin execution.   */  public void start() throws Exception {    try {      logmon.log(BasicLevel.DEBUG, getName() + ", starting");      // Create a selector      selector = Selector.open();      // Creates a connection handler for each domain's server.      for (int i=0; i<handlers.length; i++) {        if (handlers[i] != null) handlers[i].init();      }      open();      if (dispatcher == null)        dispatcher = new Dispatcher(getName(), logmon);      if (dmon == null) {        dmon = new NetServer[NbNetServer];        for (int i=0; i<NbNetServer; i++) {          dmon[i] = new NetServer(getName(), logmon);        }      }      if (! dispatcher.isRunning()) dispatcher.start();      for (int i=0; i<NbNetServer; i++) {        if (! dmon[i].isRunning()) dmon[i].start();      }    } catch (IOException exc) {      logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);      throw exc;    }    logmon.log(BasicLevel.DEBUG, getName() + ", started");  }  final CnxHandler getHandler(short sid) {    return handlers[index(sid)];  }//   /**//    *  Adds a message in "ready to deliver" list. This method allocates a//    * new time stamp to the message ; be Careful, changing the stamp imply//    * the filename change too.//    *///   public void post(Message msg) throws Exception {//     short to = AgentServer.getServerDesc(msg.to.to).gateway;//     // Allocates a new timestamp. Be careful, if the message needs to be//     // routed we have to use the next destination in timestamp generation.//     msg.source = AgentServer.getServerId();//     msg.dest = to;//     msg.stamp = getSendUpdate(to);//     // Saves the message.//     msg.save();//     // Push it in "ready to deliver" queue.//     getHandler(to).send(msg);//   }  /**   * Wakes up the watch-dog thread.   */  public void wakeup() {    if (selector != null) selector.wakeup();    logmon.log(BasicLevel.DEBUG, getName() + ", wakeup");  }  /**   * Forces the network component to stop executing.   */  public void stop() {    if (dispatcher != null) dispatcher.stop();    if (dmon != null) {      for (int i=0; i<NbNetServer; i++) {        if (dmon[i] != null) dmon[i].stop();      }    }    close();    logmon.log(BasicLevel.DEBUG, getName() + ", stopped");  }  /**   * Tests if the network component is alive.   *   * @return	true if this <code>MessageConsumer</code> is alive; false   * 		otherwise.   */  public boolean isRunning() {    if ((dispatcher == null) || ! dispatcher.isRunning())      return false;    if (dmon == null)      return false;    for (int i=0; i<NbNetServer; i++) {      if ((dmon[i] == null) || ! dmon[i].isRunning())        return false;    }    return true;  }  /**   * Returns a string representation of this consumer, including the   * daemon's name and status.   *   * @return	A string representation of this consumer.    */  public String toString() {    StringBuffer strbuf = new StringBuffer();    strbuf.append(super.toString()).append("\n\t");    if (dispatcher != null)       strbuf.append(dispatcher.toString()).append("\n");    for (int i=0; i<NbNetServer; i++) {      if ((dmon != null) && (dmon[i] != null))        strbuf.append(dmon[i].toString()).append("\n");    }    return strbuf.toString();  }  void cnxStart(SocketChannel channel) throws IOException {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, getName() + ", remotely started");    channel.socket().setSendBufferSize(SO_BUFSIZE);    channel.socket().setReceiveBufferSize(SO_BUFSIZE);    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " +                  channel.socket().getReceiveBufferSize() + ", " +                 channel.socket().getSendBufferSize());        ByteBuffer buf = ByteBuffer.allocate(6);    channel.read(buf);    buf.flip();    short sid = buf.getShort();    int boot = buf.getInt();          CnxHandler cnx = getHandler(sid);    if (cnx.remoteStart(channel, boot)) cnx.startEnd();  }  final class Dispatcher extends Daemon {    Dispatcher(String name, Logger logmon) {      super(name + ".dispatcher");      // Overload logmon definition in Daemon      this.logmon = logmon;    }    protected void close() {}    protected void shutdown() {}    public void run() {      Message msg = null;            try {        while (running) {          canStop = true;          if (this.logmon.isLoggable(BasicLevel.DEBUG))            this.logmon.log(BasicLevel.DEBUG,                            this.getName() + ", waiting message");          try {            msg = qout.get();          } catch (InterruptedException exc) {            continue;          }          canStop = false;          if (! running) break;          try {            // Send the message            getHandler(msg.getDest()).send(msg);          } catch (IOException exc) {            if (this.logmon.isLoggable(BasicLevel.ERROR))              this.logmon.log(BasicLevel.ERROR, this.getName(), exc);          }          qout.pop();        }      } finally {        finish();      }    }  }  final class NetServer extends Daemon {    NetServer(String name, Logger logmon) throws IOException {      super(name + ".NetServer");      // Overload logmon definition in Daemon      this.logmon = logmon;//       // Create a blocking server socket channel on port//       listen = ServerSocketChannel.open();//       listen.configureBlocking(false);//       listen.socket().bind(new InetSocketAddress(port));//       // Register channels with selector//       listen.register(selector, SelectionKey.OP_ACCEPT);    }    protected void close() {//       try {// 	listen.close();//       } catch (Exception exc) {}//       listen = null;    }    protected void shutdown() {      close();    }    public void run() {      int nbop = 0;      CnxHandler cnx = null;      try {      while (running) {        // This call blocks until there is activity on one of the          // registered channels. This is the key method in non-blocking I/O.        canStop = true;        try {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + ", on select");          nbop = selector.select(WDActivationPeriod);          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + ", on select:" + nbop);        } catch (IOException exc) {        }        for (int idx=0; idx<handlers.length; idx++) {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + ", " + handlers[idx]);          if ((handlers[idx] != null) &&              (handlers[idx].sendlist.size() > 0) &&              (handlers[idx].channel == null)) {            try {              handlers[idx].start();            } catch (IOException exc) {              this.logmon.log(BasicLevel.WARN,                              this.getName() + ", can't start cnx#" + idx,                              exc);            }          }        }        if (nbop == 0) continue;        canStop = false;        // Get list of selection keys with pending events, then process        // each key        Set keys = selector.selectedKeys();        for(Iterator it = keys.iterator(); it.hasNext(); ) {          if (! running) break;          // Get the selection key          SelectionKey key = (SelectionKey) it.next();          // Remove it from the list to indicate that it is being processed          it.remove();          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       getName() + "(1): " + key + " -> " + key.interestOps());          logmon.log(BasicLevel.DEBUG,                     getName() + ":" +                     key.isValid() +                     key.isAcceptable() +                     key.isReadable() +                     key.isWritable());          try {            // Check if it's a connection request            if (key.isAcceptable()) {              if (logmon.isLoggable(BasicLevel.DEBUG))                logmon.log(BasicLevel.DEBUG, getName() + " acceptable");              // Get channel with connection request (useless ?)              ServerSocketChannel server = (ServerSocketChannel) key.channel();                            // Accept the connection request.              SocketChannel channel = server.accept();              // Register the channel with selector, listening for all events              cnxStart(channel);            } else {              cnx = (CnxHandler) key.attachment();                            if (logmon.isLoggable(BasicLevel.DEBUG))                logmon.log(BasicLevel.DEBUG,                           getName() + ": " + key + " -> " + cnx);              // Since the ready operations are cumulative,              // need to check readiness for each operation              if (key.isValid() && key.isReadable()) {                if (logmon.isLoggable(BasicLevel.DEBUG))                  logmon.log(BasicLevel.DEBUG, getName() + " readable");                cnx.read();              }              if (key.isValid() && key.isWritable()) {                if (logmon.isLoggable(BasicLevel.DEBUG))                  logmon.log(BasicLevel.DEBUG, getName() + " writable");                cnx.write();              } else  if (cnx.sendlist.size() > 0) {                logmon.log(BasicLevel.FATAL, getName() + " force");                key.interestOps(key.channel().validOps());              }            }            if (logmon.isLoggable(BasicLevel.DEBUG))              logmon.log(BasicLevel.DEBUG, getName() + "(2): " +                         key + " -> " + key.interestOps());          } catch (Exception exc) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -