📄 ngnetwork.java
字号:
logmon.log(BasicLevel.ERROR, getName(), exc); // Handle error with channel and unregister try { cnx.close(); } catch (IOException exc2) { logmon.log(BasicLevel.ERROR, getName(), exc2); } } } } } catch (Throwable exc) { logmon.log(BasicLevel.FATAL, getName(), exc); } } } class CnxHandler { /** Destination server id */ private short sid; /** The handler's name. */ private String name = null; /** * True if a "local" connection is in progress, a local connection * is initiated from this server to the remote one (defined by the * {@link #server server} descriptor. * This attribute is used to synchronize local and remote attempts to * make connections. */ private boolean local = false; /** The description of the remote server handled by this network session */ private ServerDesc server; /** The communication socket channel */ SocketChannel channel = null; /** Date of last connection attempt */ long lasttry = 0L; /** Informations for output */ int nbwrite = 0; MessageOutputStream mos = null; ByteBuffer bufout = null; /** FIFO list of all messages to be sent */ MessageVector sendlist = null; /** Informations for input */ ByteBuffer bufin = null; MessageInputStream mis = null; CnxHandler(String name, short sid) throws IOException { this.sid = sid; this.name = name + ".cnxHandler#" + sid; if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", created"); mos = new MessageOutputStream(); bufin = ByteBuffer.allocateDirect(SO_BUFSIZE); mis = new MessageInputStream(); sendlist = new MessageVector(); } void init() throws IOException, UnknownServerException { server = AgentServer.getServerDesc(sid); if (sendlist.size() > 0) start(); } /** * Returns this session's name. * * @return this session's name. */ public final String getName() { return name; } void start() throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", try to start"); long currentTimeMillis = System.currentTimeMillis(); if (server == null) // probalby the Consumer is initialized but not always started return; if (((server.retry < WDNbRetryLevel1) && ((server.last + WDRetryPeriod1) < currentTimeMillis)) || ((server.retry < WDNbRetryLevel2) && ((server.last + WDRetryPeriod2) < currentTimeMillis)) || ((server.last + WDRetryPeriod3) < currentTimeMillis)) { if (localStart()) { startEnd(); } else { server.last = currentTimeMillis; server.retry += 1; } } } /** * Its method is called by <a href="#start()">start</a> in order to * initiate a connection from the local server. The corresponding code * on remote server is the method <a href="#remoteStart()">remoteStart</a>. * Its method creates the socket, initiates the network connection, and * negociates with remote server.<p><hr> * Its method can be overidden in order to change the connection protocol * (introduces authentification by example, or uses SSL), but must respect * somes conditions:<ul> * <li>send a Boot object after the initialization of object streams (it * is waiting by the wakeOnConnection thread), * <li>wait for an acknowledge, * <li>set the sock, ois and oos attributes at the end if the connection * is correct. * </ul><p> * In order to overide the protocol, we have to implements its method, * with the remoteStart and the transmit methods. * * @return true if the connection is established, false otherwise. */ boolean localStart() { synchronized (this) { if ((this.channel != null) || this.local) { // The connection is already established, or a "local" connection // is in progress (remoteStart method is synchronized). // In all cases refuses the connection request. if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", connection refused"); return false; } // Set the local attribute in order to block all others local attempts. this.local = true; } SocketChannel channel = null; try { SocketAddress addr = new InetSocketAddress(server.getAddr(), server.getPort()); channel = SocketChannel.open(addr); channel.socket().setSendBufferSize(SO_BUFSIZE); channel.socket().setReceiveBufferSize(SO_BUFSIZE); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " + channel.socket().getReceiveBufferSize() + ", " + channel.socket().getSendBufferSize()); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", writeBoot: " + getBootTS()); ByteBuffer buf = ByteBuffer.allocate(6); buf.putShort(AgentServer.getServerId()); buf.putInt(getBootTS()); buf.flip(); channel.write(buf);// ByteBuffer buf = ByteBuffer.allocate(6); buf.flip(); if (channel.read(buf) > 0) { // Reads the message length buf.flip(); int boot = buf.getInt(); AgentServer.getTransaction().begin(); testBootTS(sid, boot); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); } else { throw new ConnectException("Can't get status"); } } catch (Exception exc) { if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", connection refused.", exc); // TODO: Try it later, may be a a connection is in progress... try { channel.close(); } catch (Exception exc2) {} // Reset the local attribute to allow future attempts. this.local = false; return false; } // Normally, only one thread can reach this code (*1), so we don't have // to synchronized theses statements. First sets sock attribute, then // releases the local lock. // (*1) all local attempts are blocked and the remote side has already // setup the connection (ACK reply). this.channel = channel; this.local = false; return true; } /** * Its method is called by <a href="start(java.net.Socket, * java.io.ObjectInputStream, java.io.ObjectOutputStream">start</a> * in order to reply to a connection request from a remote server. * The corresponding code on remote server is the method * <a href="#localStart()">localStart</a>. * * @param sock the connected socket * @param ois the input stream * @param oos the output stream * * @return true if the connection is established, false otherwise. */ synchronized boolean remoteStart(SocketChannel channel, int boot) { try { if ((this.channel != null) || (this.local && server.sid > AgentServer.getServerId())) { // The connection is already established, or // a "local" connection is in progress from this server with a // greater priority. // In all cases, stops this "remote" attempt. // If the "local" attempt has a lower priority, it will fail // due to a remote reject. throw new ConnectException("Already connected"); } // Accept this connection. if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", writeBoot: " + getBootTS()); ByteBuffer buf = ByteBuffer.allocate(4); buf.putInt(getBootTS()); buf.flip(); channel.write(buf); AgentServer.getTransaction().begin(); testBootTS(sid, boot); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); // Fixing sock attribute will prevent any future attempt this.channel = channel; return true; } catch (Exception exc) { // May be a a connection is in progress, try it later... if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", connection refused", exc); // Close the connection (# NACK). try { channel.close(); } catch (Exception exc2) {} } return false; } /** * The session is well initialized, we can start the server thread that * "listen" the connected socket. If the maximum number of connections * is reached, one connection from the pool is closed. */ private void startEnd() throws IOException { server.active = true; server.retry = 0;// mos = new MessageOutputStream(); nbwrite = 0;// bufin = ByteBuffer.allocateDirect(SO_BUFSIZE); bufin.clear();// mis = new MessageInputStream(); // The returned channel is in blocking mode. channel.configureBlocking(false); // Register channels with selector channel.register(selector, channel.validOps(), this); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", connection started"); sendlist.reset(); } synchronized void send(Message msg) throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", send message: " + msg); // Adds it to the list of message to be sent, it will be removed // after its ack receipt. sendlist.addMessage(msg); if ((channel != null) && (bufout == null)) { // As no message are actually sending the channel is only subscribe // for reading, subscribe this channel for write operation will permit // to transmit the new added message. // Be careful, as this change is only take in account for the next // select operation, we have to use wakeup on selector SelectionKey key = channel.keyFor(selector); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", send message, key=" + key); if (key != null) key.interestOps(channel.validOps()); } // In all case a selector.wakeup() will solve the problem !! if (selector == null) { logmon.log(BasicLevel.WARN, getName() + ", network not started."); } else { selector.wakeup(); } } /** * Class used to send messages through a stream. */ final class MessageOutputStream extends ByteArrayOutputStream { private ObjectOutputStream oos = null; MessageOutputStream() throws IOException { super(256); oos = new ObjectOutputStream(this); count = 0; buf[29] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); buf[30] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); buf[31] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); buf[32] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); } void writeMessage(Message msg) throws IOException { logmon.log(BasicLevel.DEBUG, getName() + ", writes " + msg); // Writes sender's AgentId buf[4] = (byte) (msg.from.from >>> 8); buf[5] = (byte) (msg.from.from >>> 0); buf[6] = (byte) (msg.from.to >>> 8); buf[7] = (byte) (msg.from.to >>> 0); buf[8] = (byte) (msg.from.stamp >>> 24); buf[9] = (byte) (msg.from.stamp >>> 16); buf[10] = (byte) (msg.from.stamp >>> 8); buf[11] = (byte) (msg.from.stamp >>> 0); // Writes adressee's AgentId buf[12] = (byte) (msg.to.from >>> 8); buf[13] = (byte) (msg.to.from >>> 0); buf[14] = (byte) (msg.to.to >>> 8); buf[15] = (byte) (msg.to.to >>> 0); buf[16] = (byte) (msg.to.stamp >>> 24); buf[17] = (byte) (msg.to.stamp >>> 16); buf[18] = (byte) (msg.to.stamp >>> 8); buf[19] = (byte) (msg.to.stamp >>> 0); // Writes source server id of message buf[20] = (byte) (msg.source >>> 8); buf[21] = (byte) (msg.source >>> 0); // Writes destination server id of message buf[22] = (byte) (msg.dest >>> 8); buf[23] = (byte) (msg.dest >>> 0); // Writes stamp of message buf[24] = (byte) (msg.stamp >>> 24); buf[25] = (byte) (msg.stamp >>> 16); buf[26] = (byte) (msg.stamp >>> 8); buf[27] = (byte) (msg.stamp >>> 0); count = 28; if (msg.not != null) { // Writes notification attributes buf[28] = (byte) ((msg.not.persistent?0x01:0) | (msg.not.detachable?0x10:0)); // Be careful, the stream header is hard-written in buf[29..32] count = 33; oos.writeObject(msg.not); oos.reset(); oos.flush(); } // Writes length at beginning buf[0] = (byte) (count >>> 24); buf[1] = (byte) (count >>> 16); buf[2] = (byte) (count >>> 8); buf[3] = (byte) (count >>> 0); logmon.log(BasicLevel.DEBUG, getName() + ", writes " + count); nbwrite = count; bufout = ByteBuffer.wrap(buf, 0, count); nbwrite -= channel.write(bufout); } }// void write(Message msg) throws IOException {// mos.writeMessage(msg);// } /** * Method called each time the channel is Writable */ private synchronized void write() throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " write-1"); // test if there is still bytes to write if ((bufout != null) && (nbwrite > 0)) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " write-2"); nbwrite -= channel.write(bufout); } else { if (nbwrite == 0) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " write-3");// if (bufout != null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -