⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simplenetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    /*     *     * @exception IOException unrecoverable exception during transaction.     */    void watchdog(long currentTimeMillis) throws IOException {      if (currentTimeMillis < (last + WDActivationPeriod))        return;      last = currentTimeMillis;      ServerDesc server = null;      for (int i=0; i<sendList.size(); i++) {        Message msg = (Message) sendList.getMessageAt(i);        short msgto = msg.getDest();        if (this.logmon.isLoggable(BasicLevel.DEBUG))          this.logmon.log(BasicLevel.DEBUG,                          this.getName() +                          ", check msg#" + msg.getStamp() +                          " from " + msg.from +                          " to " + msg.to);        if ((msg.not.expiration > 0) &&            (msg.not.expiration < currentTimeMillis)) {          if (logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       getName() + ": removes expired notification " +                       msg.from + ", " + msg.not);          // Remove the message.          AgentServer.getTransaction().begin();          // Deletes the processed notification          sendList.removeMessageAt(i); i--;// AF: A reprendre.//           // send ack in JGroups to delete msg//           if (jgroups != null)//             jgroups.send(new JGroupsAckMsg(msg));          msg.delete();          msg.free();          AgentServer.getTransaction().commit();          AgentServer.getTransaction().release();        }        try {          server = AgentServer.getServerDesc(msgto);        } catch (UnknownServerException exc) {          this.logmon.log(BasicLevel.ERROR,                          this.getName() + ", can't send message: " + msg,                          exc);          // Remove the message, may be we have to post an error          // notification to sender.          AgentServer.getTransaction().begin();          // Deletes the processed notification          sendList.removeMessageAt(i); i--;// AF: A reprendre.//          // send ack in JGroups to delete msg//           if (jgroups != null)//             jgroups.send(new JGroupsAckMsg(msg));          msg.delete();          msg.free();          AgentServer.getTransaction().commit();          AgentServer.getTransaction().release();          continue;        }        if ((server.active) ||            ((server.retry < WDNbRetryLevel1) &&              ((server.last + WDRetryPeriod1) < currentTimeMillis)) ||            ((server.retry < WDNbRetryLevel2) &&             ((server.last + WDRetryPeriod2) < currentTimeMillis)) ||            ((server.last + WDRetryPeriod3) < currentTimeMillis)) {          try {            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG,                              this.getName() +                              ", send msg#" + msg.getStamp());            server.last = currentTimeMillis;            // Open the connection.            Socket socket = createSocket(server);            // The connection is ok, reset active and retry flags.            server.active = true;            server.retry = 0;            setSocketOption(socket);            send(socket, msg, currentTimeMillis);          } catch (SocketException exc) {            if (this.logmon.isLoggable(BasicLevel.WARN))              this.logmon.log(BasicLevel.WARN,                              this.getName() + ", let msg in watchdog list",                              exc);            server.active = false;            server.last = currentTimeMillis;            server.retry += 1;            //  There is a connection problem, let the message in the            // waiting list.            continue;          } catch (Exception exc) {            this.logmon.log(BasicLevel.ERROR,                            this.getName() + ", error", exc);          }          AgentServer.getTransaction().begin();          //  Deletes the processed notification          sendList.removeMessageAt(i); i--;// AF: A reprendre.//           // send ack in JGroups to delete msg//           if (jgroups != null)//             jgroups.send(new JGroupsAckMsg(msg));          msg.delete();          msg.free();          AgentServer.getTransaction().commit();          AgentServer.getTransaction().release();        }      }    }    public void send(Socket socket,                     Message msg,                     long currentTimeMillis) throws IOException {      int ret;      InputStream is = null;      try {        // Send the message,        if (this.logmon.isLoggable(BasicLevel.DEBUG))          this.logmon.log(BasicLevel.DEBUG,                          this.getName() + ", write message");        nos.writeMessage(socket, msg, currentTimeMillis);        // and wait the acknowledge.        if (this.logmon.isLoggable(BasicLevel.DEBUG))          this.logmon.log(BasicLevel.DEBUG,                          this.getName() + ", wait ack");        is = socket.getInputStream();        if ((ret = is.read()) == -1)          throw new ConnectException("Connection broken");        if (this.logmon.isLoggable(BasicLevel.DEBUG))          this.logmon.log(BasicLevel.DEBUG,                          this.getName() + ", receive ack");      } finally {        try {          socket.getOutputStream().close();        } catch (Exception exc) {}        try {          is.close();        } catch (Exception exc) {}        try {          socket.close();        } catch (Exception exc) {}      }    }  }  final class NetServerIn extends Daemon {    ServerSocket listen = null;    NetServerIn(String name, Logger logmon) throws IOException {      super(name + ".NetServerIn");      listen = createServerSocket();      // Overload logmon definition in Daemon      this.logmon = logmon;      this.setThreadGroup(AgentServer.getThreadGroup());    }    protected void close() {      try {	listen.close();      } catch (Exception exc) {}    }    protected void shutdown() {      close();    }    public void run() {      Socket socket = null;      OutputStream os = null;      ObjectInputStream ois = null;      byte[] iobuf = new byte[29];      try {	while (running) {	  try {	    canStop = true;	    // Get the connection	    try {              if (this.logmon.isLoggable(BasicLevel.DEBUG))                this.logmon.log(BasicLevel.DEBUG,                                this.getName() + ", waiting connection");	      socket = listen.accept();	    } catch (IOException exc) {	      continue;	    }	    canStop = false;	    setSocketOption(socket);            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG,                              this.getName() + ", connected");	    // Read the message,	    os = socket.getOutputStream();            InputStream is = socket.getInputStream();            Message msg = Message.alloc();            int n = 0;            do {              int count = is.read(iobuf, n, Message.LENGTH +4 - n);              if (count < 0) throw new EOFException();              n += count;            } while (n < (Message.LENGTH +4));            // Reads boot timestamp of source server            int boot = ((iobuf[0] & 0xFF) << 24) +              ((iobuf[1] & 0xFF) << 16) +              ((iobuf[2] & 0xFF) <<  8) +              ((iobuf[3] & 0xFF) <<  0);                        int idx = msg.readFromBuf(iobuf, 4);            // Reads notification attributes            boolean persistent = ((iobuf[idx] & Message.PERSISTENT) == 0)?false:true;            boolean detachable = ((iobuf[idx] & Message.DETACHABLE) == 0)?false:true;            // Reads notification object            ois = new ObjectInputStream(is);            msg.not = (Notification) ois.readObject();            if (msg.not.expiration > 0)              msg.not.expiration += System.currentTimeMillis();            msg.not.persistent = persistent;            msg.not.detachable = detachable;            msg.not.detached = false;            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG,                              this.getName() + ", msg received");            testBootTS(msg.getSource(), boot);            deliver(msg);            if (this.logmon.isLoggable(BasicLevel.DEBUG))              this.logmon.log(BasicLevel.DEBUG, this.getName() + ", send ack");	    // then send the acknowledge.	    os.write(0);            os.flush();	  } catch (Exception exc) {            this.logmon.log(BasicLevel.ERROR,                             this.getName() + ", closed", exc);	  } finally {	    try {	      os.close();	    } catch (Exception exc) {}	    os = null;	    try {	      ois.close();	    } catch (Exception exc) {}	    ois = null;	    try {	      socket.close();	    } catch (Exception exc) {}	    socket = null;	  }	}      } finally {        finish();      }    }  }  /**   * Class used to send messages through a TCP stream.   */  final class MessageOutputStream extends ByteArrayOutputStream {    private ObjectOutputStream oos = null;    private OutputStream os = null;    MessageOutputStream() throws IOException {      super(256);      oos = new ObjectOutputStream(this);      count = 0;      buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);      buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);      buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);      buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);    }    void writeMessage(Socket sock,                      Message msg,                      long currentTimeMillis) throws IOException {      os = sock.getOutputStream();      // Writes boot timestamp of source server      buf[0] = (byte) (getBootTS() >>>  24);      buf[1] = (byte) (getBootTS() >>>  16);      buf[2] = (byte) (getBootTS() >>>  8);      buf[3] = (byte) (getBootTS() >>>  0);      int idx = msg.writeToBuf(buf, 4);      // Writes notification attributes      buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) |                           (msg.not.detachable?Message.DETACHABLE:0));      // Be careful, the stream header is hard-written in buf      count = Message.LENGTH +8;      try {        if (msg.not.expiration > 0)          msg.not.expiration -= currentTimeMillis;        oos.writeObject(msg.not);        oos.reset();        oos.flush();        os.write(buf, 0, count);;        os.flush();      } finally {        if (msg.not.expiration > 0)          msg.not.expiration += currentTimeMillis;        count = 0;      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -