📄 poolnetwork.java
字号:
try { sock.close(); } catch (Exception exc2) {} // Reset the local attribute to allow future attempts. this.local = false; nis = null; nos = null; return false; } // Normally, only one thread can reach this code (*1), so we don't have // to synchronized theses statements. First sets sock attribute, then // releases the local lock. // (*1) all local attempts are blocked and the remote side has already // setup the connection (ACK reply). this.sock = sock; this.local = false; return true; } /** * Its method is called by <a href="start(java.net.Socket, * java.io.ObjectInputStream, java.io.ObjectOutputStream">start</a> * in order to reply to a connection request from a remote server. * The corresponding code on remote server is the method * <a href="#localStart()">localStart</a>. * * @param sock the connected socket * @param ois the input stream * @param oos the output stream * * @return true if the connection is established, false otherwise. */ synchronized boolean remoteStart(Socket sock, int boot) { try { if ((this.sock != null) || (this.local && server.sid > AgentServer.getServerId())) // The connection is already established, or a "local" connection // is in progress from this server with a greater priority. // In all cases, stops this "remote" attempt. If the "local" // attempt has a lower priority, it will fail due to a remote // reject. throw new ConnectException("Already connected"); // Accept this connection. if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", send AckStatus"); writeAck(sock.getOutputStream()); AgentServer.getTransaction().begin(); testBootTS(sid, boot); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); nis = new MessageInputStream(sock.getInputStream()); nos = new MessageOutputStream(sock.getOutputStream()); // Fixing sock attribute will prevent any future attempt this.sock = sock; return true; } catch (Exception exc) { // May be a a connection is in progress, try it later... if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", connection refused", exc); // Close the connection (# NACK). try { sock.getOutputStream().close(); } catch (Exception exc2) {} try { sock.getInputStream().close(); } catch (Exception exc2) {} try { sock.close(); } catch (Exception exc2) {} nis = null; nos = null; } return false; } /** * The session is well initialized, we can start the server thread that * "listen" the connected socket. If the maximum number of connections * is reached, one connection from the pool is closed. */ private void startEnd() { server.active = true; server.retry = 0; synchronized(activeSessions) { if (nbActiveCnx < nbMaxCnx) { // Insert the current session in the active pool. activeSessions[nbActiveCnx++] = this; } else { // Search the last recently used session in the pool. long min = Long.MAX_VALUE; int idx = -1; for (int i=0; i<nbMaxCnx; i++) { if (activeSessions[i].last < min) { idx = i; min = activeSessions[i].last; } } // Kill choosed session and insert new one activeSessions[idx].stop(); activeSessions[idx] = this; } last = current++; } thread = new Thread(this, getName()); thread.setDaemon(false); running = true; canStop = true; thread.start(); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", connection started"); // Try to send all waiting messages. As this.sock is no longer null // so we must do a copy a waiting messages. New messages will be send // directly in send method. // Be careful, in a very limit case a message can be sent 2 times: // added in sendList after sock setting and before array copy, il will // be transmit in send method and below. However there is no problem, // the copy will be discarded on remote node and 2 ack messages will // be received on local node. Object[] waiting = sendList.toArray(); logmon.log(BasicLevel.DEBUG, getName() + ", send " + waiting.length + " waiting messages"); Message msg = null; long currentTimeMillis = System.currentTimeMillis(); for (int i=0; i<waiting.length; i++) { msg = (Message) waiting[i]; if ((msg.not != null) && (msg.not.expiration > 0) && (msg.not.expiration < currentTimeMillis)) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); try { doAck(msg.getStamp()); } catch (IOException exc) { logmon.log(BasicLevel.ERROR, getName() + ": cannot removes expired notification " + msg.from + ", " + msg.not, exc); } } else { transmit(msg, currentTimeMillis); } } } /** * */ void stop() { running = false; if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", stopped."); while ((thread != null) && thread.isAlive()) { if (canStop) { if (thread.isAlive()) thread.interrupt(); shutdown(); } try { thread.join(1000L); } catch (InterruptedException exc) { continue; } thread = null; } } public void shutdown() { close(); } synchronized void close() { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", closed."); try { sock.getInputStream().close(); } catch (Exception exc) {} try { sock.getOutputStream().close(); } catch (Exception exc) {} try { sock.close(); } catch (Exception exc) {} sock = null; nis = null; nos = null; } /** * Removes all messages in sendList previous to the ack'ed one. * Be careful, messages in sendList are not always in stamp order. * Its method should not be synchronized, it scans the list from * begin to end, and it removes always the first element. Other * methods using sendList just adds element at the end. */ final private void doAck(int ack) throws IOException { Message msg = null; if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", ack received #" + ack); try { // Suppress the acknowledged notification from waiting list, // and deletes it. msg = sendList.removeMessage(ack); AgentServer.getTransaction().begin(); msg.delete(); msg.free(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", remove msg#" + msg.getStamp()); } catch (NoSuchElementException exc) { logmon.log(BasicLevel.WARN, getName() + ", can't ack, unknown msg#" + ack); } } /** * Be careful, its method should not be synchronized (in that case, the * overall synchronization of the connection -method start- can dead-lock). */ final void send(Message msg) { if (logmon.isLoggable(BasicLevel.DEBUG)) { if (msg.not != null) { logmon.log(BasicLevel.DEBUG, getName() + ", send msg#" + msg.getStamp()); } else { logmon.log(BasicLevel.DEBUG, getName() + ", send ack#" + msg.getStamp()); } } long currentTimeMillis = System.currentTimeMillis(); if (msg.not != null) { sendList.addElement(msg); if ((msg.not.expiration > 0) && (msg.not.expiration < currentTimeMillis)) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); try { doAck(msg.getStamp()); } catch (IOException exc) { logmon.log(BasicLevel.ERROR, getName() + ": cannot removes expired notification " + msg.from + ", " + msg.not, exc); } return; } } if (sock == null) { // If there is no connection between local and destination server, // try to make one! start(); } else { transmit(msg, currentTimeMillis); } } final private void ack(int stamp) throws Exception { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", set ack msg#" + stamp); Message ack = Message.alloc(AgentId.localId, AgentId.localId(server.sid), null); ack.source = AgentServer.getServerId(); ack.dest = AgentServer.getServerDesc(server.sid).gateway; ack.stamp = stamp; qout.push(ack); } final private synchronized void transmit(Message msg, long currentTimeMillis) { last = current++; try { nos.writeMessage(msg, currentTimeMillis); } catch (IOException exc) { logmon.log(BasicLevel.ERROR, getName() + ", exception in sending message", exc); close(); } catch (NullPointerException exc) { // The stream is closed, exits ! } } public void run() { Message msg; try { while (running) { canStop = true; if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", waiting message"); try { msg = nis.readMessage(); } catch (ClassNotFoundException exc) { // TODO: In order to process it we have to return an error, // but in that case me must identify the bad message... logmon.log(BasicLevel.ERROR, getName() + ", error during waiting message", exc); continue; } catch (InvalidClassException exc) { // TODO: In order to process it we have to return an error, // but in that case me must identify the bad message.. logmon.log(BasicLevel.ERROR, getName() + ", error during waiting message", exc); continue; } catch (StreamCorruptedException exc) { logmon.log(BasicLevel.ERROR, getName() + ", error during waiting message", exc); break; } catch (OptionalDataException exc) { logmon.log(BasicLevel.ERROR, getName() + ", error during waiting message", exc); break; } catch (NullPointerException exc) { // The stream is closed, exits ! break; } canStop = false; if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", receives: " + msg); // Keep message stamp in order to acknowledge it (be careful, // the message get a new stamp to be delivered). int stamp = msg.getStamp(); if (msg.not != null) { deliver(msg); ack(stamp); } else { doAck(stamp); } } } catch (EOFException exc) { if (running) logmon.log(BasicLevel.WARN, this.getName() + ", connection closed", exc);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -