⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poolnetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
	try {	  sock.close();	} catch (Exception exc2) {}	// Reset the local attribute to allow future attempts.        this.local = false;        nis = null;        nos = null;	return false;      }      // Normally, only one thread can reach this code (*1), so we don't have      // to synchronized theses statements. First sets sock attribute, then      // releases the local lock.      // (*1) all local attempts are blocked and the remote side has already      // setup the connection (ACK reply).      this.sock = sock;      this.local = false;      return true;    }    /**     *  Its method is called by <a href="start(java.net.Socket,     * java.io.ObjectInputStream, java.io.ObjectOutputStream">start</a>     * in order to reply to a connection request from a remote server.     * The corresponding code on remote server is the method     * <a href="#localStart()">localStart</a>.     *     * @param sock	the connected socket     * @param ois	the input stream     * @param oos	the output stream     *     * @return	true if the connection is established, false otherwise.     */    synchronized boolean remoteStart(Socket sock, int boot) {      try {	if ((this.sock != null) ||	    (this.local && server.sid > AgentServer.getServerId()))	  //  The connection is already established, or a "local" connection	  // is in progress from this server with a greater priority.	  //  In all cases, stops this "remote" attempt. If the "local"	  // attempt has a lower priority, it will fail due to a remote	  // reject.	  throw new ConnectException("Already connected");	// Accept this connection.        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG, getName() + ", send AckStatus");        writeAck(sock.getOutputStream());        AgentServer.getTransaction().begin();        testBootTS(sid, boot);        AgentServer.getTransaction().commit();        AgentServer.getTransaction().release();        nis = new MessageInputStream(sock.getInputStream());        nos = new MessageOutputStream(sock.getOutputStream());	// Fixing sock attribute will prevent any future attempt 	this.sock = sock;	return true;      } catch (Exception exc) {	// May be a a connection is in progress, try it later...        if (logmon.isLoggable(BasicLevel.WARN))          logmon.log(BasicLevel.WARN, getName() + ", connection refused", exc);	// Close the connection (# NACK).	try {	  sock.getOutputStream().close();	} catch (Exception exc2) {}	try {	  sock.getInputStream().close();	} catch (Exception exc2) {}	try {	  sock.close();	} catch (Exception exc2) {}        nis = null;        nos = null;      }      return false;    }    /**     *  The session is well initialized, we can start the server thread that     * "listen" the connected socket. If the maximum number of connections     * is reached, one connection from the pool is closed.     */    private void startEnd() {      server.active = true;      server.retry = 0;      synchronized(activeSessions) {	if (nbActiveCnx < nbMaxCnx) {	  // Insert the current session in the active pool.	  activeSessions[nbActiveCnx++] = this;	} else {	  // Search the last recently used session in the pool.	  long min = Long.MAX_VALUE;	  int idx = -1;	  for (int i=0; i<nbMaxCnx; i++) {	    if (activeSessions[i].last < min) {	      idx = i;	      min = activeSessions[i].last;	    }	  }	  // Kill choosed session and insert new one	  activeSessions[idx].stop();	  activeSessions[idx] = this;	}	last = current++;      }      thread = new Thread(this, getName());      thread.setDaemon(false);      running = true;      canStop = true;      thread.start();      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", connection started");      //  Try to send all waiting messages. As this.sock is no longer null      // so we must do a copy a waiting messages. New messages will be send      // directly in send method.      //  Be careful, in a very limit case a message can be sent 2 times:      // added in sendList after sock setting and before array copy, il will      // be transmit in send method and below. However there is no problem,      // the copy will be discarded on remote node and 2 ack messages will      // be received on local node.      Object[] waiting = sendList.toArray();      logmon.log(BasicLevel.DEBUG,		 getName() + ", send " + waiting.length + " waiting messages");      Message msg = null;      long currentTimeMillis = System.currentTimeMillis();      for (int i=0; i<waiting.length; i++) {        msg = (Message) waiting[i];        if ((msg.not != null) &&            (msg.not.expiration > 0) &&            (msg.not.expiration < currentTimeMillis)) {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       getName() + ": removes expired notification " +                       msg.from + ", " + msg.not);          try {            doAck(msg.getStamp());          } catch (IOException exc) {            logmon.log(BasicLevel.ERROR,                       getName() + ": cannot removes expired notification " +                       msg.from + ", " + msg.not, exc);          }        } else {          transmit(msg, currentTimeMillis);        }      }    }    /**     *     */    void stop() {      running = false;            if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", stopped.");      while ((thread != null) && thread.isAlive()) {        if (canStop) {          if (thread.isAlive()) thread.interrupt();          shutdown();        }        try {          thread.join(1000L);        } catch (InterruptedException exc) {          continue;        }        thread = null;      }    }    public void shutdown() {      close();    }    synchronized void close() {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", closed.");      try {	sock.getInputStream().close();      } catch (Exception exc) {}      try {	sock.getOutputStream().close();      } catch (Exception exc) {}      try {	sock.close();      } catch (Exception exc) {}      sock = null;      nis = null;      nos = null;    }    /**     * Removes all messages in sendList previous to the ack'ed one.     * Be careful, messages in sendList are not always in stamp order.     * Its method should not be synchronized, it scans the list from     * begin to end, and it removes always the first element. Other     * methods using sendList just adds element at the end.     */    final private void doAck(int ack) throws IOException {      Message msg = null;      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", ack received #" + ack);      try {        //  Suppress the acknowledged notification from waiting list,        // and deletes it.        msg = sendList.removeMessage(ack);        AgentServer.getTransaction().begin();        msg.delete();        msg.free();        AgentServer.getTransaction().commit();        AgentServer.getTransaction().release();        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                     getName() + ", remove msg#" + msg.getStamp());      } catch (NoSuchElementException exc) {        logmon.log(BasicLevel.WARN,                   getName() + ", can't ack, unknown msg#" + ack);      }    }    /**     * Be careful, its method should not be synchronized (in that case, the     * overall synchronization of the connection -method start- can dead-lock).     */    final void send(Message msg) {      if (logmon.isLoggable(BasicLevel.DEBUG)) {        if (msg.not != null) {          logmon.log(BasicLevel.DEBUG,                     getName() + ", send msg#" + msg.getStamp());        } else {          logmon.log(BasicLevel.DEBUG,                     getName() + ", send ack#" + msg.getStamp());        }      }      long currentTimeMillis = System.currentTimeMillis();      if (msg.not != null) {        sendList.addElement(msg);        if ((msg.not.expiration > 0) &&            (msg.not.expiration < currentTimeMillis)) {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       getName() + ": removes expired notification " +                       msg.from + ", " + msg.not);          try {            doAck(msg.getStamp());          } catch (IOException exc) {            logmon.log(BasicLevel.ERROR,                       getName() + ": cannot removes expired notification " +                       msg.from + ", " + msg.not, exc);          }          return;        }      }      if (sock == null) {	// If there is no connection between local and destination server,	// try to make one!	start();      } else {	transmit(msg, currentTimeMillis);      }    }        final private void ack(int stamp) throws Exception {      if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                     getName() + ", set ack msg#" + stamp);      Message ack = Message.alloc(AgentId.localId,                                  AgentId.localId(server.sid),                                  null);      ack.source = AgentServer.getServerId();      ack.dest = AgentServer.getServerDesc(server.sid).gateway;      ack.stamp = stamp;      qout.push(ack);    }    final private synchronized void transmit(Message msg,                                             long currentTimeMillis) {      last = current++;      try {        nos.writeMessage(msg, currentTimeMillis);      } catch (IOException exc) {        logmon.log(BasicLevel.ERROR,                   getName() + ", exception in sending message", exc);	close();      } catch (NullPointerException exc) {        // The stream is closed, exits !      }    }    public void run() {      Message msg;      try {	while (running) {	  canStop = true;          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + ", waiting message");	  try {	    msg = nis.readMessage();	  } catch (ClassNotFoundException exc) {	    // TODO: In order to process it we have to return an error,	    // but in that case me must identify the bad message...	    logmon.log(BasicLevel.ERROR,                       getName() + ", error during waiting message", exc);	    continue;	  } catch (InvalidClassException exc) {	    // TODO: In order to process it we have to return an error,	    // but in that case me must identify the bad message..	    logmon.log(BasicLevel.ERROR,                       getName() + ", error during waiting message", exc);	    continue;	  } catch (StreamCorruptedException exc) {	    logmon.log(BasicLevel.ERROR,                       getName() + ", error during waiting message", exc);	    break;	  } catch (OptionalDataException exc) {	    logmon.log(BasicLevel.ERROR,                       getName() + ", error during waiting message", exc);	    break;	  } catch (NullPointerException exc) {            // The stream is closed, exits !            break;          }	  canStop = false;          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + ", receives: " + msg);          //  Keep message stamp in order to acknowledge it (be careful,          // the message get a new stamp to be delivered).          int stamp = msg.getStamp();          if (msg.not != null) {            deliver(msg);            ack(stamp);          } else {            doAck(stamp);          }	}      } catch (EOFException exc) {        if (running)          logmon.log(BasicLevel.WARN,                     this.getName() + ", connection closed", exc);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -