⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tlsmanager.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        } while (true);    }        /**     * Handle an incoming message from the endpoint. This method demultiplexes     * incoming messages to the connection objects by their source address.     *     * <p/>Several types of messages may be received for a connection:     *     * <ul>     * <li>TLS Elements</li>     *  <li>Element Acknowledgements</li>     * </ul>     *     * @param msg is the incoming message     * @param srcAddr is the address of the source of the message     * @param dstAddr is the address of the destination of the message     **/    public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Starts for " + msg);        }                if (null == transport.credential) {            // ignore ALL messages until we are authenticated.            if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastNonAuthenticatedWarning) > TimeUtils.AMINUTE) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("NOT AUTHENTICATED--Discarding all incoming messages");                }                                lastNonAuthenticatedWarning = TimeUtils.timeNow();            }                        return;        }                // determine if its a retry.        MessageElement retryElement = msg.getMessageElement(JTlsDefs.TLSNameSpace, JTlsDefs.RETR);        boolean retrans = (null != retryElement);                if (retrans) {            msg.removeMessageElement(retryElement);            retryElement = null;        }                int seqN = getMsgSequenceNumber(msg);                // Extract unique part of source address        String paddr = srcAddr.getProtocolAddress();                TlsConn conn = null;                boolean serverStart = false;                synchronized (connections) {            // Will be in our hash table unless this is for a first time            // incoming connection request            conn = (TlsConn) connections.get(paddr);                        if (null != conn) {                // check if the connection has idled out and remote is asking for a restart.                if (TlsTransport.ACT_AS_SERVER && (1 == seqN)) {                    synchronized (conn) {                        long idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), conn.lastAccessed);                                                if (idle > transport.MIN_IDLE_RECONNECT) {                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                LOG.warning("Restarting : " + conn + " which has been idle for " + idle + " millis");                            }                            try {                                conn.close(HandshakeState.CONNECTIONDEAD);                            } catch (IOException ignored) {                                ;                            }                        }                    }                }                                // remove it if it is dead                if ((HandshakeState.CONNECTIONDEAD == conn.getHandshakeState())                        || (HandshakeState.HANDSHAKEFAILED == conn.getHandshakeState())) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Removing connection for: " + paddr);                    }                    connections.remove(paddr);                    conn = null;                }            }                        // we don't have a connection to this destination, make a new connection if seqn#1            if (null == conn) {                if (TlsTransport.ACT_AS_SERVER && (1 == seqN)) {                    try {                        conn = new TlsConn(transport, srcAddr, false); // false means Server                    } catch (Exception failed) {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.log(Level.WARNING, "Failed making connection for" + paddr, failed);                        }                        return;                    }                                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Adding connection for: " + paddr);                    }                    connections.put(paddr, conn);                    serverStart = true;                } else {                    // Garbage from an old connection. discard it                                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.warning(msg + " is not start of handshake (seqn#" + seqN + ") for " + paddr);                    }                                        msg.clear();                    return;                }            }        }                // if this is a new connection, get it started.        if (serverStart) {            try {                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("Start of SERVER handshake for " + paddr);                }                                // Queue message up for TlsInputStream on that connection                conn.tlsSocket.input.queueIncomingMessage(msg);                                // Start the TLS Server and complete the handshake                conn.finishHandshake(); // open the TLS connection                                conn.lastAccessed = TimeUtils.timeNow();                                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("Handshake complete for SERVER TLS for: " + paddr);                }                                return;            } catch (Throwable e) {                // Handshake failure or IOException                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "TLS Handshake failure for connection: " + paddr, e);                }                                synchronized (connections) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Removing connection for: " + paddr);                    }                    connections.remove(paddr);                }                try {                    conn.close(HandshakeState.HANDSHAKEFAILED);                } catch (IOException ignored) {                    ;                }                                return;            }        }                // handle an ongoing connection.        do {            HandshakeState currentState;                        synchronized (conn) {                if (retrans) {                    conn.retrans++;                                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("retrans received, " + conn.retrans + " total.");                    }                    retrans = false;                }                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Process incoming message for " + conn);                }                                currentState = conn.getHandshakeState();                                if ((HandshakeState.HANDSHAKESTARTED == currentState) || (HandshakeState.HANDSHAKEFINISHED == currentState)                        || (HandshakeState.CONNECTIONCLOSING == currentState)) {// we will process the message once we get out of sync.                } else if (HandshakeState.CONNECTIONDEAD == currentState) {                    // wait for the handshake to get going on another thread.                    if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                        LOG.info("Connection failed, discarding msg with seqn#" + seqN + " for " + paddr);                    }                                        return;                } else if ((HandshakeState.SERVERSTART == currentState) || (HandshakeState.CLIENTSTART == currentState)) {                    // wait for the handshake to get going on another thread.                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Sleeping msg with seqn#" + seqN + " until handshake starts for " + paddr);                    }                                        try {                        conn.wait(TimeUtils.AMINUTE);                    } catch (InterruptedException woken) {                        Thread.interrupted();                    }                    continue;                } else if (HandshakeState.HANDSHAKEFAILED == currentState) {                    // wait for the handshake to get going on another thread.                    if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                        LOG.info("Handshake failed, discarding msg with seqn#" + seqN + " for " + paddr);                    }                                        return;                } else {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.warning("Unexpected state : " + currentState);                    }                }            }                        // Process any message outside of the sync on the connection.            if ((HandshakeState.HANDSHAKESTARTED == currentState) || (HandshakeState.HANDSHAKEFINISHED == currentState)                    || (HandshakeState.CONNECTIONCLOSING == currentState)) {                // process any ACK messages.                Iterator eachACK = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.ACKS);                                while (eachACK.hasNext()) {                    MessageElement elt = (MessageElement) eachACK.next();                                        eachACK.remove();                                        int sackCount = ((int) elt.getByteLength() / 4) - 1;                                        try {                        DataInputStream dis = new DataInputStream(elt.getStream());                                                int seqack = dis.readInt();                                                int[] sacs = new int[sackCount];                                                for (int eachSac = 0; eachSac < sackCount; eachSac++) {                            sacs[eachSac] = dis.readInt();                        }                                                Arrays.sort(sacs);                                                // take care of the ACK here;                        conn.tlsSocket.output.ackReceived(seqack, sacs);                    } catch (IOException failed) {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.log(Level.WARNING, "Failure processing ACK", failed);                        }                    }                }                                if (0 == seqN) {                    return;                }                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Queue " + msg + " seqn#" + seqN + " for " + conn);                }                                // Queue message up for TlsInputStream on that connection                TlsSocket bound = conn.tlsSocket;                                if (null != bound) {                    bound.input.queueIncomingMessage(msg);                }                                return;            }        } while (true);    }        /**     * getMsgSequenceNumber     *     * @param msg  Input message     * @return int sequence number or 0 (zero) if no tls records in message.     **/    private static int getMsgSequenceNumber(Message msg) {                int seqN = 0;                Iterator eachElement = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.BLOCKS);                while (eachElement.hasNext()) {            MessageElement elt = (MessageElement) eachElement.next();                        try {                seqN = Integer.parseInt(elt.getElementName());            } catch (NumberFormatException e) {                // This element was not a TLS element. Get the next one                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Bad tls record name=" + elt.getElementName());                }                                eachElement.remove();                continue;            }                        break;        }                return seqN;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -