⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poolnetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
      } catch (SocketException exc) {        if (running)          logmon.log(BasicLevel.WARN,                     this.getName() + ", connection closed", exc);      } catch (Exception exc) {	logmon.log(BasicLevel.ERROR, getName() + ", exited", exc);      } finally {	logmon.log(BasicLevel.DEBUG, getName() + ", ends");	running = false;	close();      }    }    /**     * Class used to read messages through a stream.     */    final class MessageInputStream extends ByteArrayInputStream {      private InputStream is = null;      MessageInputStream(InputStream is) {        super(new byte[256]);        this.is = is;      }      private void readFully(int length) throws IOException {        count = 0;        if (length > buf.length) buf = new byte[length];        int nb = -1;        do {          nb = is.read(buf, count, length-count);          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG, getName() + ", reads:" + nb);          if (nb < 0) throw new EOFException();          count += nb;        } while (count != length);        pos  = 0;      }      Message readMessage() throws Exception {        count = 0;        readFully(Message.LENGTH +4 -1);        // Reads boot timestamp of source server        int length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) +          ((buf[2] & 0xFF) <<  8) + ((buf[3] & 0xFF) <<  0);        Message msg = Message.alloc();        int idx = msg.readFromBuf(buf, 4);        if (length > idx) {          // Be careful, the buffer is resetted          readFully(length - idx);          // Reads notification attributes          boolean persistent = ((buf[0] & Message.PERSISTENT) == 0)?false:true;          boolean detachable = ((buf[0] & Message.DETACHABLE) == 0)?false:true;          pos = 1;          // Reads notification object          ObjectInputStream ois = new ObjectInputStream(this);          msg.not = (Notification) ois.readObject();          if (msg.not.expiration > 0)            msg.not.expiration += System.currentTimeMillis();          msg.not.persistent = persistent;          msg.not.detachable = detachable;          msg.not.detached = false;        } else {          msg.not = null;        }        return msg;      }    }    /**     * Class used to send messages through a stream.     */    final class MessageOutputStream extends ByteArrayOutputStream {      private OutputStream os = null;      private ObjectOutputStream oos = null;      MessageOutputStream(OutputStream os) throws IOException {        super(256);        this.os = os;        oos = new ObjectOutputStream(this);        count = 0;        buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);        buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);        buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);        buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);      }      void writeMessage(Message msg,                        long currentTimeMillis) throws IOException {        logmon.log(BasicLevel.DEBUG, getName() + ", sends " + msg);        int idx = msg.writeToBuf(buf, 4);        // Be careful, notification attribute are not written if there        // is no notification.        count = Message.LENGTH +4 -1;                try {          if (msg.not != null) {            // Writes notification attributes            buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) |                                 (msg.not.detachable?Message.DETACHABLE:0));            // Be careful, the stream header is hard-written in buf            count = Message.LENGTH +8;            if (msg.not.expiration > 0)              msg.not.expiration -= currentTimeMillis;            oos.writeObject(msg.not);                        oos.reset();            oos.flush();          }          // Writes length at beginning          buf[0] = (byte) (count >>>  24);          buf[1] = (byte) (count >>>  16);          buf[2] = (byte) (count >>>  8);          buf[3] = (byte) (count >>>  0);          os.write(buf, 0, count);;          os.flush();        } finally {          if ((msg.not != null) && (msg.not.expiration > 0))            msg.not.expiration += currentTimeMillis;          count = 0;        }      }    }  }  final class WakeOnConnection extends Daemon {    ServerSocket listen = null;    WakeOnConnection(String name, Logger logmon) throws IOException {      super(name + ".wakeOnConnection");      // creates a server socket listening on configured port      listen = createServerSocket();      // Overload logmon definition in Daemon      this.logmon = logmon;    }    protected void close() {      try {	listen.close();      } catch (Exception exc) {}      listen = null;    }    protected void shutdown() {      close();    }    /**     *      */    public void run() {      /** Connected socket. */      Socket sock = null;      try {	while (running) {	  try {	    canStop = true;	    // Get the connection	    try {              if (this.logmon.isLoggable(BasicLevel.DEBUG))                this.logmon.log(BasicLevel.DEBUG,                           this.getName() + ", waiting connection");	      sock = listen.accept();	    } catch (IOException exc) {              if (running)                this.logmon.log(BasicLevel.ERROR,                                this.getName() +                                ", error during waiting connection", exc);	      continue;	    }	    canStop = false;	    setSocketOption(sock);            Boot boot = readBoot(sock.getInputStream());            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG,                              this.getName() + ", connection setup from #" +                              boot.sid);            getSession(boot.sid).start(sock, boot.boot);	  } catch (Exception exc) {	    this.logmon.log(BasicLevel.ERROR,                            this.getName() + ", bad connection setup", exc);	  }	}      } finally {        finish();      }    }  }  final class Dispatcher extends Daemon {    Dispatcher(String name, Logger logmon) {      super(name + ".dispatcher");      // Overload logmon definition in Daemon      this.logmon = logmon;    }    protected void close() {}    protected void shutdown() {}    public void run() {      Message msg = null;            try {        while (running) {          canStop = true;          if (this.logmon.isLoggable(BasicLevel.DEBUG))            this.logmon.log(BasicLevel.DEBUG,                            this.getName() + ", waiting message");          try {            msg = qout.get();          } catch (InterruptedException exc) {            continue;          }          canStop = false;          if (! running) break;          // Send the message          getSession(msg.getDest()).send(msg);          qout.pop();        }      } finally {        finish();      }    }  }  final class WatchDog extends Daemon {   /** Use to synchronize thread */    private Object lock;    WatchDog(String name, Logger logmon) {      super(name + ".watchdog");      lock = new Object();      // Overload logmon definition in Daemon      this.logmon = logmon;    }    protected void close() {}    protected void shutdown() {      wakeup();    }    void wakeup() {      synchronized (lock) {	lock.notify();      }    }    public void run() {      try {        synchronized (lock) {          while (running) {            try {              lock.wait(WDActivationPeriod);              if (this.logmon.isLoggable(BasicLevel.DEBUG))                this.logmon.log(BasicLevel.DEBUG,                                this.getName() + ", activated");            } catch (InterruptedException exc) {              continue;            }            if (! running) break;            for (int sid=0; sid<sessions.length; sid++) {              if ((sessions[sid] != null) &&                  (sessions[sid].sendList.size() > 0) &&                  (! sessions[sid].running)) {                sessions[sid].start();              }            }          }        }      } finally {        finish();      }    }  }  final void writeBoot(OutputStream out) throws IOException {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, getName() + ", writeBoot: " + getBootTS());    byte[] iobuf = new byte[6];    iobuf[0] = (byte) (AgentServer.getServerId() >>>  8);    iobuf[1] = (byte) (AgentServer.getServerId() >>>  0);    iobuf[2] = (byte) (getBootTS() >>>  24);    iobuf[3] = (byte) (getBootTS() >>>  16);    iobuf[4] = (byte) (getBootTS() >>>  8);    iobuf[5] = (byte) (getBootTS() >>>  0);    out.write(iobuf);    out.flush();  }    final class Boot {    transient short sid;    transient int boot;  }  final void readFully(InputStream is, byte[] iobuf) throws IOException {    int n = 0;    do {      int count = is.read(iobuf, n, iobuf.length - n);      if (count < 0) throw new EOFException();      n += count;    } while (n < iobuf.length);  }  final Boot readBoot(InputStream in) throws IOException {    Boot boot = new Boot();    byte[] iobuf = new byte[6];    readFully(in, iobuf);    boot.sid = (short) (((iobuf[0] & 0xFF) <<  8) + (iobuf[1] & 0xFF));    boot.boot = ((iobuf[2] & 0xFF) << 24) + ((iobuf[3] & 0xFF) << 16) +      ((iobuf[4] & 0xFF) <<  8) + ((iobuf[5] & 0xFF) <<  0);    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, getName() + ", readBoot from #" + boot.sid +                 " -> " + boot.boot);    return boot;  }    final void writeAck(OutputStream out) throws IOException {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, getName() + ", writeAck: " + getBootTS());    byte[] iobuf = new byte[4];    iobuf[0] = (byte) (getBootTS() >>>  24);    iobuf[1] = (byte) (getBootTS() >>>  16);    iobuf[2] = (byte) (getBootTS() >>>  8);    iobuf[3] = (byte) (getBootTS() >>>  0);    out.write(iobuf);    out.flush();  }    final int readAck(InputStream in)throws IOException {    byte[] iobuf = new byte[4];    readFully(in, iobuf);    int boot = ((iobuf[0] & 0xFF) << 24) + ((iobuf[1] & 0xFF) << 16) +      ((iobuf[2] & 0xFF) <<  8) + ((iobuf[3] & 0xFF) <<  0);    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, getName() + ", readAck:" + boot);    return boot;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -