📄 tlsmanager.java
字号:
} while (true); } /** * Handle an incoming message from the endpoint. This method demultiplexes * incoming messages to the connection objects by their source address. * * <p/>Several types of messages may be received for a connection: * * <ul> * <li>TLS Elements</li> * <li>Element Acknowledgements</li> * </ul> * * @param msg is the incoming message * @param srcAddr is the address of the source of the message * @param dstAddr is the address of the destination of the message **/ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Starts for " + msg); } if (null == transport.credential) { // ignore ALL messages until we are authenticated. if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastNonAuthenticatedWarning) > TimeUtils.AMINUTE) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("NOT AUTHENTICATED--Discarding all incoming messages"); } lastNonAuthenticatedWarning = TimeUtils.timeNow(); } return; } // determine if its a retry. MessageElement retryElement = msg.getMessageElement(JTlsDefs.TLSNameSpace, JTlsDefs.RETR); boolean retrans = (null != retryElement); if (retrans) { msg.removeMessageElement(retryElement); retryElement = null; } int seqN = getMsgSequenceNumber(msg); // Extract unique part of source address String paddr = srcAddr.getProtocolAddress(); TlsConn conn = null; boolean serverStart = false; synchronized (connections) { // Will be in our hash table unless this is for a first time // incoming connection request conn = (TlsConn) connections.get(paddr); if (null != conn) { // check if the connection has idled out and remote is asking for a restart. if (TlsTransport.ACT_AS_SERVER && (1 == seqN)) { synchronized (conn) { long idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), conn.lastAccessed); if (idle > transport.MIN_IDLE_RECONNECT) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Restarting : " + conn + " which has been idle for " + idle + " millis"); } try { conn.close(HandshakeState.CONNECTIONDEAD); } catch (IOException ignored) { ; } } } } // remove it if it is dead if ((HandshakeState.CONNECTIONDEAD == conn.getHandshakeState()) || (HandshakeState.HANDSHAKEFAILED == conn.getHandshakeState())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Removing connection for: " + paddr); } connections.remove(paddr); conn = null; } } // we don't have a connection to this destination, make a new connection if seqn#1 if (null == conn) { if (TlsTransport.ACT_AS_SERVER && (1 == seqN)) { try { conn = new TlsConn(transport, srcAddr, false); // false means Server } catch (Exception failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed making connection for" + paddr, failed); } return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Adding connection for: " + paddr); } connections.put(paddr, conn); serverStart = true; } else { // Garbage from an old connection. discard it if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning(msg + " is not start of handshake (seqn#" + seqN + ") for " + paddr); } msg.clear(); return; } } } // if this is a new connection, get it started. if (serverStart) { try { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Start of SERVER handshake for " + paddr); } // Queue message up for TlsInputStream on that connection conn.tlsSocket.input.queueIncomingMessage(msg); // Start the TLS Server and complete the handshake conn.finishHandshake(); // open the TLS connection conn.lastAccessed = TimeUtils.timeNow(); if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Handshake complete for SERVER TLS for: " + paddr); } return; } catch (Throwable e) { // Handshake failure or IOException if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "TLS Handshake failure for connection: " + paddr, e); } synchronized (connections) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Removing connection for: " + paddr); } connections.remove(paddr); } try { conn.close(HandshakeState.HANDSHAKEFAILED); } catch (IOException ignored) { ; } return; } } // handle an ongoing connection. do { HandshakeState currentState; synchronized (conn) { if (retrans) { conn.retrans++; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("retrans received, " + conn.retrans + " total."); } retrans = false; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Process incoming message for " + conn); } currentState = conn.getHandshakeState(); if ((HandshakeState.HANDSHAKESTARTED == currentState) || (HandshakeState.HANDSHAKEFINISHED == currentState) || (HandshakeState.CONNECTIONCLOSING == currentState)) {// we will process the message once we get out of sync. } else if (HandshakeState.CONNECTIONDEAD == currentState) { // wait for the handshake to get going on another thread. if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Connection failed, discarding msg with seqn#" + seqN + " for " + paddr); } return; } else if ((HandshakeState.SERVERSTART == currentState) || (HandshakeState.CLIENTSTART == currentState)) { // wait for the handshake to get going on another thread. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sleeping msg with seqn#" + seqN + " until handshake starts for " + paddr); } try { conn.wait(TimeUtils.AMINUTE); } catch (InterruptedException woken) { Thread.interrupted(); } continue; } else if (HandshakeState.HANDSHAKEFAILED == currentState) { // wait for the handshake to get going on another thread. if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Handshake failed, discarding msg with seqn#" + seqN + " for " + paddr); } return; } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Unexpected state : " + currentState); } } } // Process any message outside of the sync on the connection. if ((HandshakeState.HANDSHAKESTARTED == currentState) || (HandshakeState.HANDSHAKEFINISHED == currentState) || (HandshakeState.CONNECTIONCLOSING == currentState)) { // process any ACK messages. Iterator eachACK = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.ACKS); while (eachACK.hasNext()) { MessageElement elt = (MessageElement) eachACK.next(); eachACK.remove(); int sackCount = ((int) elt.getByteLength() / 4) - 1; try { DataInputStream dis = new DataInputStream(elt.getStream()); int seqack = dis.readInt(); int[] sacs = new int[sackCount]; for (int eachSac = 0; eachSac < sackCount; eachSac++) { sacs[eachSac] = dis.readInt(); } Arrays.sort(sacs); // take care of the ACK here; conn.tlsSocket.output.ackReceived(seqack, sacs); } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure processing ACK", failed); } } } if (0 == seqN) { return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Queue " + msg + " seqn#" + seqN + " for " + conn); } // Queue message up for TlsInputStream on that connection TlsSocket bound = conn.tlsSocket; if (null != bound) { bound.input.queueIncomingMessage(msg); } return; } } while (true); } /** * getMsgSequenceNumber * * @param msg Input message * @return int sequence number or 0 (zero) if no tls records in message. **/ private static int getMsgSequenceNumber(Message msg) { int seqN = 0; Iterator eachElement = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.BLOCKS); while (eachElement.hasNext()) { MessageElement elt = (MessageElement) eachElement.next(); try { seqN = Integer.parseInt(elt.getElementName()); } catch (NumberFormatException e) { // This element was not a TLS element. Get the next one if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Bad tls record name=" + elt.getElementName()); } eachElement.remove(); continue; } break; } return seqN; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -