⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingreadingmode.java

📁 基于Jabber协议的即时消息服务器
💻 JAVA
字号:
/** * $RCSfile$ * $Revision: $ * $Date: $ * * Copyright (C) 2006 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */package org.jivesoftware.wildfire.net;import com.jcraft.jzlib.JZlib;import com.jcraft.jzlib.ZInputStream;import org.dom4j.Element;import org.jivesoftware.util.LocaleUtils;import org.jivesoftware.util.Log;import org.jivesoftware.wildfire.SessionManager;import org.xmlpull.v1.XmlPullParser;import org.xmlpull.v1.XmlPullParserException;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.StringReader;import java.net.Socket;import java.net.SocketException;import java.nio.channels.SocketChannel;/** * Process incoming packets using a non-blocking model. * * @author Daniele Piras */class NonBlockingReadingMode extends SocketReadingMode {    // DANIELE: Socket read timeout in milliseconds    private static int READ_TIMEOUT = 0;    private static String STREAM_START = "<stream:stream";    // DANIELE: Semaphore to avoid concurrent reading operation from different thread    private boolean isReading;    // DANIELE: lightweight xml parser.    private XMLLightweightParser xmlLightWeightParser;    // DANIELE: Channel for socket connection    private SocketChannel socketChannel;    // DANIELE: Indicate if the reading operations has been scheduled into the executor.    // this is very important because if all reading thread are busy is used to avoid    // to reinsert into the queue the reading operation.    private boolean isScheduled = false;    // DANIELE: Indicate if a session is already created    private boolean sessionCreated = false;    // DANIELE: Indicate if a stream:stream is arrived to complete a sals authentication    private boolean awaytingSasl = false;    // DANIELE: Indicate if a stream:stream is arrived to complete compression    private boolean awaitingForCompleteCompression = false;    private StreamReader streamReader;    public NonBlockingReadingMode(Socket socket, SocketReader socketReader) {        super(socket, socketReader);        // DANIELE: Initialization        // Setting timeout for reading operations.        try {            socket.setSoTimeout(READ_TIMEOUT);        }        catch (SocketException e) {            // There is an exception...            Log.warn(e);        }        socketChannel = socket.getChannel();        // Initialize XML light weight parser        xmlLightWeightParser = new XMLLightweightParser(socketChannel, CHARSET);        isReading = false;        socketReader.open = true;        streamReader = new StreamReader();    }    /* DANIELE:     * Method that verify if the client has data in the channel and in this case     * call an executor to perform reading operations.     */    void run() {        try {            // Check if the socket is open            if (socketReader.open) {                // Verify semaphore and if there are data into the socket.                if (!isReading && !isScheduled) {                    try {                        // Semaphore to avoid concurrent schedule of the same read operation.                        isScheduled = true;                        // Schedule execution with executor                        IOExecutor.execute(streamReader);                    }                    catch (Exception e) {                        if (socketReader.session != null) {                            Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") +                                    ". Session: " +                                    socketReader.session, e);                        }                    }                }            }        }        catch (Exception e) {            socketReader.shutdown();            // There is an exception...            Log.error(e);        }        if (!socketReader.open) {            socketReader.shutdown();        }    }    protected void tlsNegotiated() throws XmlPullParserException, IOException {        XmlPullParser xpp = socketReader.reader.getXPPParser();        InputStream is = socketReader.connection.getTLSStreamHandler().getInputStream();        xpp.setInput(new InputStreamReader(is, CHARSET));        xmlLightWeightParser.setInput( is, CHARSET );        super.tlsNegotiated();    }    protected boolean compressClient(Element doc) throws IOException, XmlPullParserException {        boolean answer = super.compressClient(doc);        if (answer) {            XmlPullParser xpp = socketReader.reader.getXPPParser();            // Reset the parser since a new stream header has been sent from the client            if (socketReader.connection.getTLSStreamHandler() == null) {                InputStream is;                if (socketChannel != null) {                    // DANIELE: Create an inputstream using the utility class ChannelInputStream.                    is = new ChannelInputStream(socketChannel);                }                else {                    is = socket.getInputStream();                }                is = ServerTrafficCounter.wrapInputStream(is);                ZInputStream in = new ZInputStream(is);                in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);                xpp.setInput(new InputStreamReader(in, CHARSET));            }            else {                ZInputStream in = new ZInputStream(                        socketReader.connection.getTLSStreamHandler().getInputStream());                in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);                xpp.setInput(new InputStreamReader(in, CHARSET));                xmlLightWeightParser.setInput( in, CHARSET );            }        }        return answer;    }    class StreamReader implements Runnable {        /*         * This method is invoked when client send data to the channel.         *         */        public void run() {            try {                // If no other reading operations are perform                if (!isReading) {                    // Change the semaphore status                    isReading = true;                    // Call the XML light-wieght parser to read data...                    xmlLightWeightParser.read();                    // Check if the parser has found a complete message...                    if (xmlLightWeightParser.areThereMsgs()) {                        // Process every message found                        String[] msgs = xmlLightWeightParser.getMsgs();                        for (int i = 0; i < msgs.length; i++) {                            //System.out.println( "Processing " + msgs[ i ] );                            readStream(msgs[i]);                        }                    }                }            }            catch (IOException e) {                if (socketReader.session != null) {                    // DANIELE: Remove session from SessionManager. I don't know if                    // this is the easy way.                    // TODO Review this. Closing the connection should be used???                    SessionManager.getInstance().removeSession(                            SessionManager.getInstance().getSession(                                    socketReader.session.getAddress()));                }                try {                    xmlLightWeightParser.getChannel().close();                }                catch (IOException e1) {                }                // System.out.println( "Client disconnecting" );            }            catch (Exception e) {                if (socketReader.session != null) {                    Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +                            socketReader.session, e);                }                e.printStackTrace();            }            finally {                isReading = false;                isScheduled = false;            }        }        /**         * Process a single message         */        private void readStream(String msg) throws Exception {            if (msg.trim().startsWith(STREAM_START)) {                // Found an stream:stream tag...                if (!sessionCreated) {                    sessionCreated = true;                    socketReader.reader.getXPPParser().setInput(new StringReader(                            msg + ((msg.indexOf("</stream:stream") == -1) ? "</stream:stream>" :                                    "")));                    socketReader.createSession();                }                else if (awaytingSasl) {                    awaytingSasl = false;                    saslSuccessful();                }                else if (awaitingForCompleteCompression) {                    awaitingForCompleteCompression = false;                    compressionSuccessful();                }                return;            }            // Create dom in base on the string.            Element doc = socketReader.reader.parseDocument(msg).getRootElement();            if (doc == null) {                // No document found.                return;            }            String tag = doc.getName();            if ("starttls".equals(tag)) {                // Negotiate TLS                if (negotiateTLS()) {                    tlsNegotiated();                }                else {                    socketReader.open = false;                    socketReader.session = null;                }            }            else if ("auth".equals(tag)) {                // User is trying to authenticate using SASL                if (authenticateClient(doc)) {                    // SASL authentication was successful so open a new stream and offer                    // resource binding and session establishment (to client sessions only)                    awaytingSasl = true;                }                else if (socketReader.connection.isClosed()) {                    socketReader.open = false;                    socketReader.session = null;                }            }            else if ("compress".equals(tag))            {                // Client is trying to initiate compression                if (compressClient(doc)) {                    // Compression was successful so open a new stream and offer                    // resource binding and session establishment (to client sessions only)                    awaitingForCompleteCompression = true;                }            }            else {                socketReader.process(doc);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -