📄 poolnetwork.java
字号:
/* * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s): */package fr.dyade.aaa.agent;import java.io.*;import java.net.*;import java.util.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.*;/** * <code>PoolNetwork</code> is an implementation of <code>StreamNetwork</code> * class that manages multiple connection. */public class PoolNetwork extends StreamNetwork { /** */ WakeOnConnection wakeOnConnection = null; /** */ NetSession sessions[] = null; /** */ Dispatcher dispatcher = null; /** */ WatchDog watchDog = null; static int nbMaxCnx; int nbActiveCnx = 0; NetSession activeSessions[]; long current = 0L; /** * Creates a new network component. */ public PoolNetwork() throws Exception { super(); } /** * Initializes a new network component. This method is used in order to * easily creates and configure a Network component from a class name. * So we can use the <code>Class.newInstance()</code> method for create * (whitout any parameter) the component, then we can initialize it with * this method.<br> * This method initializes the logical clock for the domain. * * @param name The domain name. * @param port The listen port. * @param servers The list of servers directly accessible from this * network interface. */ public void init(String name, int port, short[] servers) throws Exception { super.init(name, port, servers); // Creates a session for all domain's server. sessions = new NetSession[servers.length]; for (int i=0; i<sessions.length; i++) { if (servers[i] != AgentServer.getServerId()) sessions[i] = new NetSession(getName(), servers[i]); } wakeOnConnection = new WakeOnConnection(getName(), logmon); dispatcher = new Dispatcher(getName(), logmon); watchDog = new WatchDog(getName(), logmon); } /** * Causes this network component to begin execution. */ public void start() throws Exception { logmon.log(BasicLevel.DEBUG, getName() + ", starting"); try { nbMaxCnx = AgentServer.getInteger(getName() + ".nbMaxCnx").intValue(); } catch (Exception exc) { try { nbMaxCnx = AgentServer.getInteger("PoolNetwork.nbMaxCnx").intValue(); } catch (Exception exc2) { nbMaxCnx = 5; } } try { if (isRunning()) throw new IOException("Consumer already running."); for (int i=0; i<sessions.length; i++) { if (sessions[i] != null) sessions[i].init(); } activeSessions = new NetSession[nbMaxCnx]; wakeOnConnection.start(); dispatcher.start(); watchDog.start(); } catch (Exception exc) { logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); throw exc; } logmon.log(BasicLevel.DEBUG, getName() + ", started"); } /** * Wakes up the watch-dog thread. */ public void wakeup() { if (watchDog != null) watchDog.wakeup(); } /** * Forces the network component to stop executing. */ public void stop() { if (wakeOnConnection != null) wakeOnConnection.stop(); if (dispatcher != null) dispatcher.stop(); if (watchDog != null) watchDog.stop(); for (int i=0; i<sessions.length; i++) { // May be we can take in account only "active" sessions. if (sessions[i]!= null) sessions[i].stop(); } logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); } /** * Tests if the network component is alive. * * @return true if this <code>MessageConsumer</code> is alive; false * otherwise. */ public boolean isRunning() { if ((wakeOnConnection != null) && wakeOnConnection.isRunning() && (dispatcher != null) && dispatcher.isRunning() && (watchDog != null) && watchDog.isRunning()) return true; return false; } final NetSession getSession(short sid) { return sessions[index(sid)]; } /** * Returns a string representation of this consumer, including the * daemon's name and status. * * @return A string representation of this consumer. */ public String toString() { StringBuffer strbuf = new StringBuffer(); strbuf.append(super.toString()).append("\n\t"); if (wakeOnConnection != null) strbuf.append(wakeOnConnection.toString()).append("\n\t"); if (dispatcher != null) strbuf.append(dispatcher.toString()).append("\n\t"); if (watchDog != null) strbuf.append(watchDog.toString()).append("\n\t"); for (int i=0; i<sessions.length; i++) { // May be we can take in account only "active" sessions. if (sessions[i]!= null) strbuf.append(sessions[i].toString()).append("\n\t"); } return strbuf.toString(); } final class MessageVector extends Vector { public synchronized Message removeMessage(int stamp) { Message msg = null; modCount++; for (int index=0 ; index<elementCount ; index++) { try { msg = (Message) elementData[index]; } catch (ClassCastException exc) { continue; } if (msg.getStamp() == stamp) { int j = elementCount - index - 1; if (j > 0) { System.arraycopy(elementData, index + 1, elementData, index, j); } elementCount--; elementData[elementCount] = null; /* to let gc do its work */ return msg; } } throw new NoSuchElementException(); } } final class NetSession implements Runnable { /** Destination server id */ private short sid; /** * Boolean variable used to stop the daemon properly. The dameon tests * this variable between each request, and stops if it is false. * @see start * @see stop */ private volatile boolean running = false; /** * True if the sessions can be stopped, false otherwise. A session can * be stopped if it is waiting. */ private boolean canStop = false; /** The thread. */ private Thread thread = null; /** The session's name. */ private String name = null; /** * True if a "local" connection is in progress, a local connection * is initiated from this server to the remote one (defined by the * {@link #server server} descriptor. * This attribute is used to synchronize local and remote attempts to * make connections. */ private boolean local = false; /** * The description of the remote server handled by this network session. */ private ServerDesc server; /** The communication socket. */ private Socket sock = null; MessageInputStream nis = null; MessageOutputStream nos = null; /** */ private MessageVector sendList; private long last = 0L; NetSession(String name, short sid) { this.sid = sid; this.name = name + ".netSession#" + sid; if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", created"); running = false; canStop = false; thread = null; sendList = new MessageVector(); } void init() throws UnknownServerException { server = AgentServer.getServerDesc(sid); } /** * Returns this session's name. * * @return this session's name. */ public final String getName() { return name; } void start() { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", started"); long currentTimeMillis = System.currentTimeMillis(); if (((server.retry < WDNbRetryLevel1) && ((server.last + WDRetryPeriod1) < currentTimeMillis)) || ((server.retry < WDNbRetryLevel2) && ((server.last + WDRetryPeriod2) < currentTimeMillis)) || ((server.last + WDRetryPeriod3) < currentTimeMillis)) { if (localStart()) { startEnd(); } else { server.last = currentTimeMillis; server.retry += 1; } } } void start(Socket sock, int boot) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", remotely started"); if (remoteStart(sock, boot)) startEnd(); } /** * Its method is called by <a href="#start()">start</a> in order to * initiate a connection from the local server. The corresponding code * on remote server is the method <a href="#remoteStart()">remoteStart</a>. * Its method creates the socket, initiates the network connection, and * negociates with remote server.<p><hr> * Its method can be overidden in order to change the connection protocol * (introduces authentification by example, or uses SSL), but must respect * somes conditions:<ul> * <li>send a Boot object after the initialization of object streams (it * is waiting by the wakeOnConnection thread), * <li>wait for an acknowledge, * <li>set the sock, ois and oos attributes at the end if the connection * is correct. * </ul><p> * In order to overide the protocol, we have to implements its method, * with the remoteStart and the transmit methods. * * @return true if the connection is established, false otherwise. */ boolean localStart() { synchronized (this) { if ((this.sock != null) || this.local) { // The connection is already established, or a "local" connection // is in progress (remoteStart method is synchronized). // In all cases refuses the connection request. if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", connection refused"); return false; } // Set the local attribute in order to block all others local attempts. this.local = true; } Socket sock = null; try { sock = createSocket(server); setSocketOption(sock); writeBoot(sock.getOutputStream()); int boot = readAck(sock.getInputStream()); AgentServer.getTransaction().begin(); testBootTS(sid, boot); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); nis = new MessageInputStream(sock.getInputStream()); nos = new MessageOutputStream(sock.getOutputStream()); } catch (Exception exc) { if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", connection refused.", exc); // TODO: Try it later, may be a a connection is in progress... try { sock.getOutputStream().close(); } catch (Exception exc2) {} try { sock.getInputStream().close(); } catch (Exception exc2) {}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -