📄 ngnetwork.java
字号:
/* * Copyright (C) 2003 - 2005 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. */package fr.dyade.aaa.agent;import java.io.*;import java.net.*;import java.util.*;import java.nio.*;import java.nio.channels.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.Daemon;/** * <code>NGNetwork</code> is a new implementation of <code>Network</code> * class using nio package. */public class NGNetwork extends StreamNetwork { final static int Kb = 1024; final static int Mb = 1024 * Kb; final static int SO_BUFSIZE = 64 * Kb; Selector selector = null; Dispatcher dispatcher = null; NetServer dmon[] = null; // Be careful, NbNetServer>1 cause malfunctioning (slowness, etc.) final static int NbNetServer = 1; CnxHandler[] handlers = null; /** * Creates a new network component. */ public NGNetwork() { super(); } /** * Initializes a new network component. This method is used in order to * easily creates and configure a Network component from a class name. * So we can use the <code>Class.newInstance()</code> method for create * (whitout any parameter) the component, then we can initialize it with * this method.<br> * This method initializes the logical clock for the domain. * * @param name The domain name. * @param port The listen port. * @param servers The list of servers directly accessible from this * network interface. */ public void init(String name, int port, short[] servers) throws Exception { super.init(name, port, servers); // Creates a connection handler for each domain's server. handlers = new CnxHandler[servers.length]; for (int i=0; i<servers.length; i++) { if (servers[i] != AgentServer.getServerId()) handlers[i] = new CnxHandler(getName(), servers[i]); } } ServerSocketChannel listen = null; void open() throws IOException { // Create a blocking server socket channel on port listen = ServerSocketChannel.open(); listen.configureBlocking(false); listen.socket().bind(new InetSocketAddress(port)); // Register channels with selector listen.register(selector, SelectionKey.OP_ACCEPT); } void close() { try { listen.close(); } catch (Exception exc) {} listen = null; } /** * Causes this network component to begin execution. */ public void start() throws Exception { try { logmon.log(BasicLevel.DEBUG, getName() + ", starting"); // Create a selector selector = Selector.open(); // Creates a connection handler for each domain's server. for (int i=0; i<handlers.length; i++) { if (handlers[i] != null) handlers[i].init(); } open(); if (dispatcher == null) dispatcher = new Dispatcher(getName(), logmon); if (dmon == null) { dmon = new NetServer[NbNetServer]; for (int i=0; i<NbNetServer; i++) { dmon[i] = new NetServer(getName(), logmon); } } if (! dispatcher.isRunning()) dispatcher.start(); for (int i=0; i<NbNetServer; i++) { if (! dmon[i].isRunning()) dmon[i].start(); } } catch (IOException exc) { logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); throw exc; } logmon.log(BasicLevel.DEBUG, getName() + ", started"); } final CnxHandler getHandler(short sid) { return handlers[index(sid)]; }// /**// * Adds a message in "ready to deliver" list. This method allocates a// * new time stamp to the message ; be Careful, changing the stamp imply// * the filename change too.// */// public void post(Message msg) throws Exception {// short to = AgentServer.getServerDesc(msg.to.to).gateway;// // Allocates a new timestamp. Be careful, if the message needs to be// // routed we have to use the next destination in timestamp generation.// msg.source = AgentServer.getServerId();// msg.dest = to;// msg.stamp = getSendUpdate(to);// // Saves the message.// msg.save();// // Push it in "ready to deliver" queue.// getHandler(to).send(msg);// } /** * Wakes up the watch-dog thread. */ public void wakeup() { if (selector != null) selector.wakeup(); logmon.log(BasicLevel.DEBUG, getName() + ", wakeup"); } /** * Forces the network component to stop executing. */ public void stop() { if (dispatcher != null) dispatcher.stop(); if (dmon != null) { for (int i=0; i<NbNetServer; i++) { if (dmon[i] != null) dmon[i].stop(); } } close(); logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); } /** * Tests if the network component is alive. * * @return true if this <code>MessageConsumer</code> is alive; false * otherwise. */ public boolean isRunning() { if ((dispatcher == null) || ! dispatcher.isRunning()) return false; if (dmon == null) return false; for (int i=0; i<NbNetServer; i++) { if ((dmon[i] == null) || ! dmon[i].isRunning()) return false; } return true; } /** * Returns a string representation of this consumer, including the * daemon's name and status. * * @return A string representation of this consumer. */ public String toString() { StringBuffer strbuf = new StringBuffer(); strbuf.append(super.toString()).append("\n\t"); if (dispatcher != null) strbuf.append(dispatcher.toString()).append("\n"); for (int i=0; i<NbNetServer; i++) { if ((dmon != null) && (dmon[i] != null)) strbuf.append(dmon[i].toString()).append("\n"); } return strbuf.toString(); } void cnxStart(SocketChannel channel) throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", remotely started"); channel.socket().setSendBufferSize(SO_BUFSIZE); channel.socket().setReceiveBufferSize(SO_BUFSIZE); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " + channel.socket().getReceiveBufferSize() + ", " + channel.socket().getSendBufferSize()); ByteBuffer buf = ByteBuffer.allocate(6); channel.read(buf); buf.flip(); short sid = buf.getShort(); int boot = buf.getInt(); CnxHandler cnx = getHandler(sid); if (cnx.remoteStart(channel, boot)) cnx.startEnd(); } final class Dispatcher extends Daemon { Dispatcher(String name, Logger logmon) { super(name + ".dispatcher"); // Overload logmon definition in Daemon this.logmon = logmon; } protected void close() {} protected void shutdown() {} public void run() { Message msg = null; try { while (running) { canStop = true; if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", waiting message"); try { msg = qout.get(); } catch (InterruptedException exc) { continue; } canStop = false; if (! running) break; try { // Send the message getHandler(msg.getDest()).send(msg); } catch (IOException exc) { if (this.logmon.isLoggable(BasicLevel.ERROR)) this.logmon.log(BasicLevel.ERROR, this.getName(), exc); } qout.pop(); } } finally { finish(); } } } final class NetServer extends Daemon { NetServer(String name, Logger logmon) throws IOException { super(name + ".NetServer"); // Overload logmon definition in Daemon this.logmon = logmon;// // Create a blocking server socket channel on port// listen = ServerSocketChannel.open();// listen.configureBlocking(false);// listen.socket().bind(new InetSocketAddress(port));// // Register channels with selector// listen.register(selector, SelectionKey.OP_ACCEPT); } protected void close() {// try {// listen.close();// } catch (Exception exc) {}// listen = null; } protected void shutdown() { close(); } public void run() { int nbop = 0; CnxHandler cnx = null; try { while (running) { // This call blocks until there is activity on one of the // registered channels. This is the key method in non-blocking I/O. canStop = true; try { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", on select"); nbop = selector.select(WDActivationPeriod); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", on select:" + nbop); } catch (IOException exc) { } for (int idx=0; idx<handlers.length; idx++) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", " + handlers[idx]); if ((handlers[idx] != null) && (handlers[idx].sendlist.size() > 0) && (handlers[idx].channel == null)) { try { handlers[idx].start(); } catch (IOException exc) { this.logmon.log(BasicLevel.WARN, this.getName() + ", can't start cnx#" + idx, exc); } } } if (nbop == 0) continue; canStop = false; // Get list of selection keys with pending events, then process // each key Set keys = selector.selectedKeys(); for(Iterator it = keys.iterator(); it.hasNext(); ) { if (! running) break; // Get the selection key SelectionKey key = (SelectionKey) it.next(); // Remove it from the list to indicate that it is being processed it.remove(); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + "(1): " + key + " -> " + key.interestOps()); logmon.log(BasicLevel.DEBUG, getName() + ":" + key.isValid() + key.isAcceptable() + key.isReadable() + key.isWritable()); try { // Check if it's a connection request if (key.isAcceptable()) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " acceptable"); // Get channel with connection request (useless ?) ServerSocketChannel server = (ServerSocketChannel) key.channel(); // Accept the connection request. SocketChannel channel = server.accept(); // Register the channel with selector, listening for all events cnxStart(channel); } else { cnx = (CnxHandler) key.attachment(); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": " + key + " -> " + cnx); // Since the ready operations are cumulative, // need to check readiness for each operation if (key.isValid() && key.isReadable()) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " readable"); cnx.read(); } if (key.isValid() && key.isWritable()) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " writable"); cnx.write(); } else if (cnx.sendlist.size() > 0) { logmon.log(BasicLevel.FATAL, getName() + " force"); key.interestOps(key.channel().validOps()); } } if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + "(2): " + key + " -> " + key.interestOps()); } catch (Exception exc) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -