⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poolnetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s):  */package fr.dyade.aaa.agent;import java.io.*;import java.net.*;import java.util.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.*;/** *  <code>PoolNetwork</code> is an implementation of <code>StreamNetwork</code> * class that manages multiple connection. */public class PoolNetwork extends StreamNetwork {  /** */  WakeOnConnection wakeOnConnection = null;   /** */  NetSession sessions[] = null;  /** */  Dispatcher dispatcher = null;  /** */  WatchDog watchDog = null;  static int nbMaxCnx;  int nbActiveCnx = 0;  NetSession activeSessions[];  long current = 0L;  /**   * Creates a new network component.   */  public PoolNetwork() throws Exception {    super();  }  /**   * Initializes a new network component. This method is used in order to   * easily creates and configure a Network component from a class name.   * So we can use the <code>Class.newInstance()</code> method for create   * (whitout any parameter) the component, then we can initialize it with   * this method.<br>   * This method initializes the logical clock for the domain.   *   * @param name	The domain name.   * @param port	The listen port.   * @param servers	The list of servers directly accessible from this   *			network interface.   */  public void init(String name, int port, short[] servers) throws Exception {    super.init(name, port, servers);    // Creates a session for all domain's server.    sessions = new NetSession[servers.length];    for (int i=0; i<sessions.length; i++) {      if (servers[i] != AgentServer.getServerId())        sessions[i] = new NetSession(getName(), servers[i]);    }    wakeOnConnection = new WakeOnConnection(getName(), logmon);    dispatcher = new Dispatcher(getName(), logmon);    watchDog = new WatchDog(getName(), logmon);  }  /**   * Causes this network component to begin execution.   */  public void start() throws Exception {    logmon.log(BasicLevel.DEBUG, getName() + ", starting");    try {      nbMaxCnx = AgentServer.getInteger(getName() + ".nbMaxCnx").intValue();    } catch (Exception exc) {      try {	nbMaxCnx = AgentServer.getInteger("PoolNetwork.nbMaxCnx").intValue();      } catch (Exception exc2) {	nbMaxCnx = 5;      }    }    try {      if (isRunning())	throw new IOException("Consumer already running.");      for (int i=0; i<sessions.length; i++) {        if (sessions[i] != null) sessions[i].init();      }      activeSessions = new NetSession[nbMaxCnx];      wakeOnConnection.start();      dispatcher.start();      watchDog.start();    } catch (Exception exc) {      logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);      throw exc;    }    logmon.log(BasicLevel.DEBUG, getName() + ", started");  }  /**   * Wakes up the watch-dog thread.   */  public void wakeup() {    if (watchDog != null) watchDog.wakeup();  }  /**   * Forces the network component to stop executing.   */  public void stop() {    if (wakeOnConnection != null) wakeOnConnection.stop();    if (dispatcher != null) dispatcher.stop();    if (watchDog != null) watchDog.stop();    for (int i=0; i<sessions.length; i++) {      // May be we can take in account only "active" sessions.      if (sessions[i]!= null) sessions[i].stop();    }    logmon.log(BasicLevel.DEBUG, getName() + ", stopped");  }  /**   * Tests if the network component is alive.   *   * @return	true if this <code>MessageConsumer</code> is alive; false   * 		otherwise.   */  public boolean isRunning() {    if ((wakeOnConnection != null) && wakeOnConnection.isRunning() &&	(dispatcher != null) && dispatcher.isRunning() &&	(watchDog != null) && watchDog.isRunning())      return true;    return false;  }  final NetSession getSession(short sid) {    return sessions[index(sid)];  }  /**   * Returns a string representation of this consumer, including the   * daemon's name and status.   *   * @return	A string representation of this consumer.    */  public String toString() {    StringBuffer strbuf = new StringBuffer();    strbuf.append(super.toString()).append("\n\t");    if (wakeOnConnection != null)      strbuf.append(wakeOnConnection.toString()).append("\n\t");    if (dispatcher != null)      strbuf.append(dispatcher.toString()).append("\n\t");    if (watchDog != null)      strbuf.append(watchDog.toString()).append("\n\t");    for (int i=0; i<sessions.length; i++) {      // May be we can take in account only "active" sessions.      if (sessions[i]!= null)	strbuf.append(sessions[i].toString()).append("\n\t");    }    return strbuf.toString();  }  final class MessageVector extends Vector {    public synchronized Message removeMessage(int stamp) {      Message msg = null;      modCount++;      for (int index=0 ; index<elementCount ; index++) {        try {          msg = (Message) elementData[index];        } catch (ClassCastException exc) {          continue;        }        if (msg.getStamp() == stamp) {          int j = elementCount - index - 1;          if (j > 0) {	    System.arraycopy(elementData, index + 1, elementData, index, j);          }          elementCount--;          elementData[elementCount] = null; /* to let gc do its work */                  return msg;        }      }      throw new NoSuchElementException();    }  }  final class NetSession implements Runnable {    /** Destination server id */    private short sid;    /**     * Boolean variable used to stop the daemon properly. The dameon tests     * this variable between each request, and stops if it is false.     * @see start     * @see stop     */    private volatile boolean running = false;    /**     * True if the sessions can be stopped, false otherwise. A session can     * be stopped if it is waiting.     */    private boolean canStop = false;    /** The thread. */    private Thread thread = null;    /** The session's name. */    private String name = null;    /**     *  True if a "local" connection is in progress, a local connection     * is initiated from this server to the remote one (defined by the     * {@link #server server} descriptor.     *  This attribute is used to synchronize local and remote attempts to     * make connections.     */    private boolean local = false;    /**     * The description of the remote server handled by this network session.     */    private ServerDesc server;    /** The communication socket. */    private Socket sock = null;    MessageInputStream nis = null;    MessageOutputStream nos = null;    /** */    private MessageVector sendList;    private long last = 0L;    NetSession(String name, short sid) {      this.sid = sid;      this.name = name + ".netSession#" + sid;      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", created");            running = false;      canStop = false;      thread = null;      sendList = new MessageVector();    }    void init() throws UnknownServerException {      server = AgentServer.getServerDesc(sid);    }    /**     * Returns this session's name.     *     * @return this session's name.     */    public final String getName() {      return name;    }    void start() {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", started");      long currentTimeMillis = System.currentTimeMillis();      if (((server.retry < WDNbRetryLevel1) && 	   ((server.last + WDRetryPeriod1) < currentTimeMillis)) ||	  ((server.retry < WDNbRetryLevel2) &&	   ((server.last + WDRetryPeriod2) < currentTimeMillis)) ||	  ((server.last + WDRetryPeriod3) < currentTimeMillis)) {	if (localStart()) {	  startEnd();	} else {	  server.last = currentTimeMillis;	  server.retry += 1;	}      }    }    void start(Socket sock, int boot) {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", remotely started");      if (remoteStart(sock, boot)) startEnd();    }    /**     *  Its method is called by <a href="#start()">start</a> in order to     * initiate a connection from the local server. The corresponding code     * on remote server is the method <a href="#remoteStart()">remoteStart</a>.     * Its method creates the socket, initiates the network connection, and     * negociates with remote server.<p><hr>     *  Its method can be overidden in order to change the connection protocol     * (introduces authentification by example, or uses SSL), but must respect     * somes conditions:<ul>     * <li>send a Boot object after the initialization of object streams (it     * is waiting by the wakeOnConnection thread),     * <li>wait for an acknowledge,     * <li>set the sock, ois and oos attributes at the end if the connection     * is correct.     * </ul><p>     *  In order to overide the protocol, we have to implements its method,     * with the remoteStart and the transmit methods.     *     * @return	true if the connection is established, false otherwise.     */    boolean localStart() {      synchronized (this) {	if ((this.sock != null) || this.local) {	  //  The connection is already established, or a "local" connection	  // is in progress (remoteStart method is synchronized).	  //  In all cases refuses the connection request.          if (logmon.isLoggable(BasicLevel.WARN))            logmon.log(BasicLevel.WARN, getName() + ", connection refused");	  return false;	}	// Set the local attribute in order to block all others local attempts.	this.local = true;      }      Socket sock = null;      try {	sock = createSocket(server);	setSocketOption(sock);	writeBoot(sock.getOutputStream());        int boot = readAck(sock.getInputStream());        AgentServer.getTransaction().begin();        testBootTS(sid, boot);        AgentServer.getTransaction().commit();        AgentServer.getTransaction().release();        nis = new MessageInputStream(sock.getInputStream());        nos = new MessageOutputStream(sock.getOutputStream());      } catch (Exception exc) {        if (logmon.isLoggable(BasicLevel.WARN))          logmon.log(BasicLevel.WARN,                     getName() + ", connection refused.", exc);	// TODO: Try it later, may be a a connection is in progress...	try {	  sock.getOutputStream().close();	} catch (Exception exc2) {}	try {	  sock.getInputStream().close();	} catch (Exception exc2) {}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -