📄 simplenetwork.java
字号:
/* * * @exception IOException unrecoverable exception during transaction. */ void watchdog(long currentTimeMillis) throws IOException { if (currentTimeMillis < (last + WDActivationPeriod)) return; last = currentTimeMillis; ServerDesc server = null; for (int i=0; i<sendList.size(); i++) { Message msg = (Message) sendList.getMessageAt(i); short msgto = msg.getDest(); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", check msg#" + msg.getStamp() + " from " + msg.from + " to " + msg.to); if ((msg.not.expiration > 0) && (msg.not.expiration < currentTimeMillis)) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); // Remove the message. AgentServer.getTransaction().begin(); // Deletes the processed notification sendList.removeMessageAt(i); i--;// AF: A reprendre.// // send ack in JGroups to delete msg// if (jgroups != null)// jgroups.send(new JGroupsAckMsg(msg)); msg.delete(); msg.free(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); } try { server = AgentServer.getServerDesc(msgto); } catch (UnknownServerException exc) { this.logmon.log(BasicLevel.ERROR, this.getName() + ", can't send message: " + msg, exc); // Remove the message, may be we have to post an error // notification to sender. AgentServer.getTransaction().begin(); // Deletes the processed notification sendList.removeMessageAt(i); i--;// AF: A reprendre.// // send ack in JGroups to delete msg// if (jgroups != null)// jgroups.send(new JGroupsAckMsg(msg)); msg.delete(); msg.free(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); continue; } if ((server.active) || ((server.retry < WDNbRetryLevel1) && ((server.last + WDRetryPeriod1) < currentTimeMillis)) || ((server.retry < WDNbRetryLevel2) && ((server.last + WDRetryPeriod2) < currentTimeMillis)) || ((server.last + WDRetryPeriod3) < currentTimeMillis)) { try { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", send msg#" + msg.getStamp()); server.last = currentTimeMillis; // Open the connection. Socket socket = createSocket(server); // The connection is ok, reset active and retry flags. server.active = true; server.retry = 0; setSocketOption(socket); send(socket, msg, currentTimeMillis); } catch (SocketException exc) { if (this.logmon.isLoggable(BasicLevel.WARN)) this.logmon.log(BasicLevel.WARN, this.getName() + ", let msg in watchdog list", exc); server.active = false; server.last = currentTimeMillis; server.retry += 1; // There is a connection problem, let the message in the // waiting list. continue; } catch (Exception exc) { this.logmon.log(BasicLevel.ERROR, this.getName() + ", error", exc); } AgentServer.getTransaction().begin(); // Deletes the processed notification sendList.removeMessageAt(i); i--;// AF: A reprendre.// // send ack in JGroups to delete msg// if (jgroups != null)// jgroups.send(new JGroupsAckMsg(msg)); msg.delete(); msg.free(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); } } } public void send(Socket socket, Message msg, long currentTimeMillis) throws IOException { int ret; InputStream is = null; try { // Send the message, if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", write message"); nos.writeMessage(socket, msg, currentTimeMillis); // and wait the acknowledge. if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", wait ack"); is = socket.getInputStream(); if ((ret = is.read()) == -1) throw new ConnectException("Connection broken"); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", receive ack"); } finally { try { socket.getOutputStream().close(); } catch (Exception exc) {} try { is.close(); } catch (Exception exc) {} try { socket.close(); } catch (Exception exc) {} } } } final class NetServerIn extends Daemon { ServerSocket listen = null; NetServerIn(String name, Logger logmon) throws IOException { super(name + ".NetServerIn"); listen = createServerSocket(); // Overload logmon definition in Daemon this.logmon = logmon; this.setThreadGroup(AgentServer.getThreadGroup()); } protected void close() { try { listen.close(); } catch (Exception exc) {} } protected void shutdown() { close(); } public void run() { Socket socket = null; OutputStream os = null; ObjectInputStream ois = null; byte[] iobuf = new byte[29]; try { while (running) { try { canStop = true; // Get the connection try { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", waiting connection"); socket = listen.accept(); } catch (IOException exc) { continue; } canStop = false; setSocketOption(socket); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connected"); // Read the message, os = socket.getOutputStream(); InputStream is = socket.getInputStream(); Message msg = Message.alloc(); int n = 0; do { int count = is.read(iobuf, n, Message.LENGTH +4 - n); if (count < 0) throw new EOFException(); n += count; } while (n < (Message.LENGTH +4)); // Reads boot timestamp of source server int boot = ((iobuf[0] & 0xFF) << 24) + ((iobuf[1] & 0xFF) << 16) + ((iobuf[2] & 0xFF) << 8) + ((iobuf[3] & 0xFF) << 0); int idx = msg.readFromBuf(iobuf, 4); // Reads notification attributes boolean persistent = ((iobuf[idx] & Message.PERSISTENT) == 0)?false:true; boolean detachable = ((iobuf[idx] & Message.DETACHABLE) == 0)?false:true; // Reads notification object ois = new ObjectInputStream(is); msg.not = (Notification) ois.readObject(); if (msg.not.expiration > 0) msg.not.expiration += System.currentTimeMillis(); msg.not.persistent = persistent; msg.not.detachable = detachable; msg.not.detached = false; if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", msg received"); testBootTS(msg.getSource(), boot); deliver(msg); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", send ack"); // then send the acknowledge. os.write(0); os.flush(); } catch (Exception exc) { this.logmon.log(BasicLevel.ERROR, this.getName() + ", closed", exc); } finally { try { os.close(); } catch (Exception exc) {} os = null; try { ois.close(); } catch (Exception exc) {} ois = null; try { socket.close(); } catch (Exception exc) {} socket = null; } } } finally { finish(); } } } /** * Class used to send messages through a TCP stream. */ final class MessageOutputStream extends ByteArrayOutputStream { private ObjectOutputStream oos = null; private OutputStream os = null; MessageOutputStream() throws IOException { super(256); oos = new ObjectOutputStream(this); count = 0; buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); } void writeMessage(Socket sock, Message msg, long currentTimeMillis) throws IOException { os = sock.getOutputStream(); // Writes boot timestamp of source server buf[0] = (byte) (getBootTS() >>> 24); buf[1] = (byte) (getBootTS() >>> 16); buf[2] = (byte) (getBootTS() >>> 8); buf[3] = (byte) (getBootTS() >>> 0); int idx = msg.writeToBuf(buf, 4); // Writes notification attributes buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) | (msg.not.detachable?Message.DETACHABLE:0)); // Be careful, the stream header is hard-written in buf count = Message.LENGTH +8; try { if (msg.not.expiration > 0) msg.not.expiration -= currentTimeMillis; oos.writeObject(msg.not); oos.reset(); oos.flush(); os.write(buf, 0, count);; os.flush(); } finally { if (msg.not.expiration > 0) msg.not.expiration += currentTimeMillis; count = 0; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -