⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ngnetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            logmon.log(BasicLevel.ERROR, getName(), exc);            // Handle error with channel and unregister            try {              cnx.close();            } catch (IOException exc2) {              logmon.log(BasicLevel.ERROR, getName(), exc2);            }          }        }      }      } catch (Throwable exc) {        logmon.log(BasicLevel.FATAL, getName(), exc);      }    }  }  class CnxHandler {    /** Destination server id */    private short sid;    /** The handler's name. */    private String name = null;    /**     *  True if a "local" connection is in progress, a local connection     * is initiated from this server to the remote one (defined by the     * {@link #server server} descriptor.     *  This attribute is used to synchronize local and remote attempts to     * make connections.     */    private boolean local = false;    /** The description of the remote server handled by this network session */    private ServerDesc server;    /** The communication socket channel */    SocketChannel channel = null;    /** Date of last connection attempt */    long lasttry = 0L;    /** Informations for output */    int nbwrite = 0;    MessageOutputStream mos = null;    ByteBuffer bufout = null;    /** FIFO list of all messages to be sent */    MessageVector sendlist = null;    /** Informations for input */    ByteBuffer bufin = null;    MessageInputStream mis = null;    CnxHandler(String name, short sid) throws IOException {      this.sid = sid;      this.name = name + ".cnxHandler#" + sid;            if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", created");      mos = new MessageOutputStream();      bufin = ByteBuffer.allocateDirect(SO_BUFSIZE);      mis = new MessageInputStream();      sendlist = new MessageVector();    }    void init() throws IOException, UnknownServerException {      server = AgentServer.getServerDesc(sid);      if (sendlist.size() > 0) start();    }    /**     * Returns this session's name.     *     * @return this session's name.     */    public final String getName() {      return name;    }    void start() throws IOException {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + ", try to start");      long currentTimeMillis = System.currentTimeMillis();      if (server == null)        // probalby the Consumer is initialized but not always started        return;      if (((server.retry < WDNbRetryLevel1) && 	   ((server.last + WDRetryPeriod1) < currentTimeMillis)) ||	  ((server.retry < WDNbRetryLevel2) &&	   ((server.last + WDRetryPeriod2) < currentTimeMillis)) ||	  ((server.last + WDRetryPeriod3) < currentTimeMillis)) {	if (localStart()) {	  startEnd();	} else {	  server.last = currentTimeMillis;	  server.retry += 1;	}      }    }    /**     *  Its method is called by <a href="#start()">start</a> in order to     * initiate a connection from the local server. The corresponding code     * on remote server is the method <a href="#remoteStart()">remoteStart</a>.     * Its method creates the socket, initiates the network connection, and     * negociates with remote server.<p><hr>     *  Its method can be overidden in order to change the connection protocol     * (introduces authentification by example, or uses SSL), but must respect     * somes conditions:<ul>     * <li>send a Boot object after the initialization of object streams (it     * is waiting by the wakeOnConnection thread),     * <li>wait for an acknowledge,     * <li>set the sock, ois and oos attributes at the end if the connection     * is correct.     * </ul><p>     *  In order to overide the protocol, we have to implements its method,     * with the remoteStart and the transmit methods.     *     * @return	true if the connection is established, false otherwise.     */    boolean localStart() {      synchronized (this) {	if ((this.channel != null) || this.local) {	  //  The connection is already established, or a "local" connection	  // is in progress (remoteStart method is synchronized).	  //  In all cases refuses the connection request.          if (logmon.isLoggable(BasicLevel.WARN))            logmon.log(BasicLevel.WARN, getName() + ", connection refused");	  return false;	}	// Set the local attribute in order to block all others local attempts.	this.local = true;      }      SocketChannel channel = null;      try {        SocketAddress addr = new InetSocketAddress(server.getAddr(),                                                   server.getPort());        channel = SocketChannel.open(addr);        channel.socket().setSendBufferSize(SO_BUFSIZE);        channel.socket().setReceiveBufferSize(SO_BUFSIZE);        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " +                      channel.socket().getReceiveBufferSize() + ", " +                     channel.socket().getSendBufferSize());        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                     getName() + ", writeBoot: " + getBootTS());        ByteBuffer buf = ByteBuffer.allocate(6);        buf.putShort(AgentServer.getServerId());        buf.putInt(getBootTS());        buf.flip();        channel.write(buf);//         ByteBuffer buf = ByteBuffer.allocate(6);        buf.flip();        if (channel.read(buf) > 0) {          // Reads the message length          buf.flip();          int boot = buf.getInt();          AgentServer.getTransaction().begin();          testBootTS(sid, boot);          AgentServer.getTransaction().commit();          AgentServer.getTransaction().release();        } else {          throw new ConnectException("Can't get status");        }      } catch (Exception exc) {        if (logmon.isLoggable(BasicLevel.WARN))          logmon.log(BasicLevel.WARN,                     getName() + ", connection refused.", exc);	// TODO: Try it later, may be a a connection is in progress...	try {	  channel.close();	} catch (Exception exc2) {}	// Reset the local attribute to allow future attempts.        this.local = false;	return false;      }      // Normally, only one thread can reach this code (*1), so we don't have      // to synchronized theses statements. First sets sock attribute, then      // releases the local lock.      // (*1) all local attempts are blocked and the remote side has already      // setup the connection (ACK reply).      this.channel = channel;      this.local = false;      return true;    }    /**     *  Its method is called by <a href="start(java.net.Socket,     * java.io.ObjectInputStream, java.io.ObjectOutputStream">start</a>     * in order to reply to a connection request from a remote server.     * The corresponding code on remote server is the method     * <a href="#localStart()">localStart</a>.     *     * @param sock	the connected socket     * @param ois	the input stream     * @param oos	the output stream     *     * @return	true if the connection is established, false otherwise.     */    synchronized boolean remoteStart(SocketChannel channel, int boot) {      try {	if ((this.channel != null) ||	    (this.local && server.sid > AgentServer.getServerId())) {	  //  The connection is already established, or	  // a "local" connection is in progress from this server with a	  // greater priority.	  //  In all cases, stops this "remote" attempt.	  //  If the "local" attempt has a lower priority, it will fail	  // due to a remote reject.	  throw new ConnectException("Already connected");        }	// Accept this connection.        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                     getName() + ", writeBoot: " + getBootTS());        ByteBuffer buf = ByteBuffer.allocate(4);        buf.putInt(getBootTS());        buf.flip();        channel.write(buf);        AgentServer.getTransaction().begin();        testBootTS(sid, boot);        AgentServer.getTransaction().commit();        AgentServer.getTransaction().release();	// Fixing sock attribute will prevent any future attempt 	this.channel = channel;	return true;      } catch (Exception exc) {	// May be a a connection is in progress, try it later...        if (logmon.isLoggable(BasicLevel.WARN))          logmon.log(BasicLevel.WARN,                         getName() + ", connection refused", exc);	// Close the connection (# NACK).	try {	  channel.close();	} catch (Exception exc2) {}      }      return false;    }    /**     *  The session is well initialized, we can start the server thread that     * "listen" the connected socket. If the maximum number of connections     * is reached, one connection from the pool is closed.     */    private void startEnd() throws IOException {      server.active = true;      server.retry = 0;//       mos = new MessageOutputStream();      nbwrite = 0;//       bufin = ByteBuffer.allocateDirect(SO_BUFSIZE);      bufin.clear();//       mis = new MessageInputStream();            // The returned channel is in blocking mode.      channel.configureBlocking(false);      // Register channels with selector      channel.register(selector, channel.validOps(), this);      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG,                         getName() + ", connection started");      sendlist.reset();    }    synchronized void send(Message msg) throws IOException {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG,                         getName() + ", send message: " + msg);      // Adds it to the list of message to be sent, it will be removed      // after its ack receipt.      sendlist.addMessage(msg);      if ((channel != null) && (bufout == null)) {        // As no message are actually sending the channel is only subscribe        // for reading, subscribe this channel for write operation will permit        // to transmit the new added message.	// Be careful, as this change is only take in account for the next	// select operation, we have to use wakeup on selector        SelectionKey key = channel.keyFor(selector);        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                         getName() + ", send message, key=" + key);        if (key != null)          key.interestOps(channel.validOps());      }      // In all case a selector.wakeup() will solve the problem !!      if (selector == null) {        logmon.log(BasicLevel.WARN,                   getName() + ", network not started.");      } else {        selector.wakeup();      }    }    /**     * Class used to send messages through a stream.     */    final class MessageOutputStream extends ByteArrayOutputStream {      private ObjectOutputStream oos = null;      MessageOutputStream() throws IOException {        super(256);        oos = new ObjectOutputStream(this);        count = 0;        buf[29] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);        buf[30] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);        buf[31] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);        buf[32] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);      }      void writeMessage(Message msg) throws IOException {        logmon.log(BasicLevel.DEBUG, getName() + ", writes " + msg);        // Writes sender's AgentId        buf[4] = (byte) (msg.from.from >>>  8);        buf[5] = (byte) (msg.from.from >>>  0);        buf[6] = (byte) (msg.from.to >>>  8);        buf[7] = (byte) (msg.from.to >>>  0);        buf[8] = (byte) (msg.from.stamp >>>  24);        buf[9] = (byte) (msg.from.stamp >>>  16);        buf[10] = (byte) (msg.from.stamp >>>  8);        buf[11] = (byte) (msg.from.stamp >>>  0);        // Writes adressee's AgentId        buf[12]  = (byte) (msg.to.from >>>  8);        buf[13]  = (byte) (msg.to.from >>>  0);        buf[14] = (byte) (msg.to.to >>>  8);        buf[15] = (byte) (msg.to.to >>>  0);        buf[16] = (byte) (msg.to.stamp >>>  24);        buf[17] = (byte) (msg.to.stamp >>>  16);        buf[18] = (byte) (msg.to.stamp >>>  8);        buf[19] = (byte) (msg.to.stamp >>>  0);        // Writes source server id of message        buf[20]  = (byte) (msg.source >>>  8);        buf[21]  = (byte) (msg.source >>>  0);        // Writes destination server id of message        buf[22] = (byte) (msg.dest >>>  8);        buf[23] = (byte) (msg.dest >>>  0);        // Writes stamp of message        buf[24] = (byte) (msg.stamp >>>  24);        buf[25] = (byte) (msg.stamp >>>  16);        buf[26] = (byte) (msg.stamp >>>  8);        buf[27] = (byte) (msg.stamp >>>  0);        count = 28;        if (msg.not != null) {          // Writes notification attributes          buf[28] = (byte) ((msg.not.persistent?0x01:0) |                            (msg.not.detachable?0x10:0));          // Be careful, the stream header is hard-written in buf[29..32]          count = 33;          oos.writeObject(msg.not);          oos.reset();          oos.flush();        }        // Writes length at beginning        buf[0] = (byte) (count >>>  24);        buf[1] = (byte) (count >>>  16);        buf[2] = (byte) (count >>>  8);        buf[3] = (byte) (count >>>  0);                logmon.log(BasicLevel.DEBUG, getName() + ", writes " + count);        nbwrite = count;        bufout = ByteBuffer.wrap(buf, 0, count);        nbwrite -= channel.write(bufout);      }    }//     void write(Message msg) throws IOException {//       mos.writeMessage(msg);//     }    /**     * Method called each time the channel is Writable     */    private synchronized void write() throws IOException {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + " write-1");      // test if there is still bytes to write      if ((bufout != null) && (nbwrite > 0)) {        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG, getName() + " write-2");        nbwrite -= channel.write(bufout);      } else {        if (nbwrite == 0) {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + " write-3");//           if (bufout != null) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -