📄 poolnetwork.java
字号:
} catch (SocketException exc) { if (running) logmon.log(BasicLevel.WARN, this.getName() + ", connection closed", exc); } catch (Exception exc) { logmon.log(BasicLevel.ERROR, getName() + ", exited", exc); } finally { logmon.log(BasicLevel.DEBUG, getName() + ", ends"); running = false; close(); } } /** * Class used to read messages through a stream. */ final class MessageInputStream extends ByteArrayInputStream { private InputStream is = null; MessageInputStream(InputStream is) { super(new byte[256]); this.is = is; } private void readFully(int length) throws IOException { count = 0; if (length > buf.length) buf = new byte[length]; int nb = -1; do { nb = is.read(buf, count, length-count); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", reads:" + nb); if (nb < 0) throw new EOFException(); count += nb; } while (count != length); pos = 0; } Message readMessage() throws Exception { count = 0; readFully(Message.LENGTH +4 -1); // Reads boot timestamp of source server int length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) + ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0); Message msg = Message.alloc(); int idx = msg.readFromBuf(buf, 4); if (length > idx) { // Be careful, the buffer is resetted readFully(length - idx); // Reads notification attributes boolean persistent = ((buf[0] & Message.PERSISTENT) == 0)?false:true; boolean detachable = ((buf[0] & Message.DETACHABLE) == 0)?false:true; pos = 1; // Reads notification object ObjectInputStream ois = new ObjectInputStream(this); msg.not = (Notification) ois.readObject(); if (msg.not.expiration > 0) msg.not.expiration += System.currentTimeMillis(); msg.not.persistent = persistent; msg.not.detachable = detachable; msg.not.detached = false; } else { msg.not = null; } return msg; } } /** * Class used to send messages through a stream. */ final class MessageOutputStream extends ByteArrayOutputStream { private OutputStream os = null; private ObjectOutputStream oos = null; MessageOutputStream(OutputStream os) throws IOException { super(256); this.os = os; oos = new ObjectOutputStream(this); count = 0; buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); } void writeMessage(Message msg, long currentTimeMillis) throws IOException { logmon.log(BasicLevel.DEBUG, getName() + ", sends " + msg); int idx = msg.writeToBuf(buf, 4); // Be careful, notification attribute are not written if there // is no notification. count = Message.LENGTH +4 -1; try { if (msg.not != null) { // Writes notification attributes buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) | (msg.not.detachable?Message.DETACHABLE:0)); // Be careful, the stream header is hard-written in buf count = Message.LENGTH +8; if (msg.not.expiration > 0) msg.not.expiration -= currentTimeMillis; oos.writeObject(msg.not); oos.reset(); oos.flush(); } // Writes length at beginning buf[0] = (byte) (count >>> 24); buf[1] = (byte) (count >>> 16); buf[2] = (byte) (count >>> 8); buf[3] = (byte) (count >>> 0); os.write(buf, 0, count);; os.flush(); } finally { if ((msg.not != null) && (msg.not.expiration > 0)) msg.not.expiration += currentTimeMillis; count = 0; } } } } final class WakeOnConnection extends Daemon { ServerSocket listen = null; WakeOnConnection(String name, Logger logmon) throws IOException { super(name + ".wakeOnConnection"); // creates a server socket listening on configured port listen = createServerSocket(); // Overload logmon definition in Daemon this.logmon = logmon; } protected void close() { try { listen.close(); } catch (Exception exc) {} listen = null; } protected void shutdown() { close(); } /** * */ public void run() { /** Connected socket. */ Socket sock = null; try { while (running) { try { canStop = true; // Get the connection try { if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", waiting connection"); sock = listen.accept(); } catch (IOException exc) { if (running) this.logmon.log(BasicLevel.ERROR, this.getName() + ", error during waiting connection", exc); continue; } canStop = false; setSocketOption(sock); Boot boot = readBoot(sock.getInputStream()); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connection setup from #" + boot.sid); getSession(boot.sid).start(sock, boot.boot); } catch (Exception exc) { this.logmon.log(BasicLevel.ERROR, this.getName() + ", bad connection setup", exc); } } } finally { finish(); } } } final class Dispatcher extends Daemon { Dispatcher(String name, Logger logmon) { super(name + ".dispatcher"); // Overload logmon definition in Daemon this.logmon = logmon; } protected void close() {} protected void shutdown() {} public void run() { Message msg = null; try { while (running) { canStop = true; if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", waiting message"); try { msg = qout.get(); } catch (InterruptedException exc) { continue; } canStop = false; if (! running) break; // Send the message getSession(msg.getDest()).send(msg); qout.pop(); } } finally { finish(); } } } final class WatchDog extends Daemon { /** Use to synchronize thread */ private Object lock; WatchDog(String name, Logger logmon) { super(name + ".watchdog"); lock = new Object(); // Overload logmon definition in Daemon this.logmon = logmon; } protected void close() {} protected void shutdown() { wakeup(); } void wakeup() { synchronized (lock) { lock.notify(); } } public void run() { try { synchronized (lock) { while (running) { try { lock.wait(WDActivationPeriod); if (this.logmon.isLoggable(BasicLevel.DEBUG)) this.logmon.log(BasicLevel.DEBUG, this.getName() + ", activated"); } catch (InterruptedException exc) { continue; } if (! running) break; for (int sid=0; sid<sessions.length; sid++) { if ((sessions[sid] != null) && (sessions[sid].sendList.size() > 0) && (! sessions[sid].running)) { sessions[sid].start(); } } } } } finally { finish(); } } } final void writeBoot(OutputStream out) throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", writeBoot: " + getBootTS()); byte[] iobuf = new byte[6]; iobuf[0] = (byte) (AgentServer.getServerId() >>> 8); iobuf[1] = (byte) (AgentServer.getServerId() >>> 0); iobuf[2] = (byte) (getBootTS() >>> 24); iobuf[3] = (byte) (getBootTS() >>> 16); iobuf[4] = (byte) (getBootTS() >>> 8); iobuf[5] = (byte) (getBootTS() >>> 0); out.write(iobuf); out.flush(); } final class Boot { transient short sid; transient int boot; } final void readFully(InputStream is, byte[] iobuf) throws IOException { int n = 0; do { int count = is.read(iobuf, n, iobuf.length - n); if (count < 0) throw new EOFException(); n += count; } while (n < iobuf.length); } final Boot readBoot(InputStream in) throws IOException { Boot boot = new Boot(); byte[] iobuf = new byte[6]; readFully(in, iobuf); boot.sid = (short) (((iobuf[0] & 0xFF) << 8) + (iobuf[1] & 0xFF)); boot.boot = ((iobuf[2] & 0xFF) << 24) + ((iobuf[3] & 0xFF) << 16) + ((iobuf[4] & 0xFF) << 8) + ((iobuf[5] & 0xFF) << 0); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", readBoot from #" + boot.sid + " -> " + boot.boot); return boot; } final void writeAck(OutputStream out) throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", writeAck: " + getBootTS()); byte[] iobuf = new byte[4]; iobuf[0] = (byte) (getBootTS() >>> 24); iobuf[1] = (byte) (getBootTS() >>> 16); iobuf[2] = (byte) (getBootTS() >>> 8); iobuf[3] = (byte) (getBootTS() >>> 0); out.write(iobuf); out.flush(); } final int readAck(InputStream in)throws IOException { byte[] iobuf = new byte[4]; readFully(in, iobuf); int boot = ((iobuf[0] & 0xFF) << 24) + ((iobuf[1] & 0xFF) << 16) + ((iobuf[2] & 0xFF) << 8) + ((iobuf[3] & 0xFF) << 0); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", readAck:" + boot); return boot; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -