⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingstreamreader.java

📁 spam source codejasen-0.9jASEN - java Anti Spam ENgine.zip 如标题所示
💻 JAVA
字号:
/*
 * @(#)NonBlockingStreamReader.java	31/10/2004
 *
 * Copyright (c) 2004, 2005  jASEN.org
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   1. Redistributions of source code must retain the above copyright notice,
 *      this list of conditions and the following disclaimer.
 *
 *   2. Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in
 *      the documentation and/or other materials provided with the distribution.
 *
 *   3. The names of the authors may not be used to endorse or promote products
 *      derived from this software without specific prior written permission.
 *
 *   4. Any modification or additions to the software must be contributed back
 *      to the project.
 *
 *   5. Any investigation or reverse engineering of source code or binary to
 *      enable emails to bypass the filters, and hence inflict spam and or viruses
 *      onto users who use or do not use jASEN could subject the perpetrator to
 *      criminal and or civil liability.
 *
 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JASEN.ORG,
 * OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */
package org.jasen.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * <P>
 * 	Implements a threaded input stream reader which can be terminated if it blocks for too long.
 * </P>
 * @author Jason Polites
 */
public class NonBlockingStreamReader
{
    private StreamReaderListener streamReaderListener;
    
    /**
     *
     */
    public NonBlockingStreamReader() {
        super ();
    }
    
    public NonBlockingStreamReader(StreamReaderListener streamReaderListener) {
        super ();
        this.streamReaderListener = streamReaderListener;
    }    

    /**
     * Reads from the specified stream with the give buffer size and writes to the specified output stream.
     * <P>
     * If the reader blocks on a read() call for more than timeout milliseconds, the read process is abnormally aborted and an IOException is thrown
     * </P>
     * @param in The inputstream from which to read
     * @param out The outputstream to which to write
     * @param bufferSize The size of the buffer (in bytes) to use when reading/writing
     * @param timeout The timeout (in milliseconds) to allow the read to block until bytes are available from the stream
     * @throws IOException If an error occurred while reading the stream, or the read timed out
     */
    public void read(InputStream in, OutputStream out, int bufferSize, long timeout, byte[] terminator) throws IOException {

        if(bufferSize > 1 && terminator != null) {
            throw new NonBlockingIOException("Cannot specify line terminator with > 1 length buffer", NonBlockingIOException.INVALID_BUFFER);
        }

        // Create a new thread reader
        StreamReader thread = new StreamReader (in, out, bufferSize, terminator);
        thread.start ();

        long time = 0L;

        while(thread.running) {
            time = System.currentTimeMillis();

            synchronized (thread) {
                try
                {
                    thread.wait(timeout);

                    if((System.currentTimeMillis() - time) >= timeout) {
                        if(thread.isAlive()) {
                            thread.interrupt();
                            thread.running = false;
                            thread = null;
                        }

                        throw new NonBlockingIOException("Timeout waiting for bytes", NonBlockingIOException.TIMEOUT);
                    }
                    else if (thread.error != null) {
                        throw thread.error;
                    }
                }
                catch (InterruptedException ignore) {}
            }
        }
    }

    protected class StreamReader extends Thread
    {

        private InputStream in;
        private OutputStream out;
        private int bufferSize;
        private byte[] terminator;

        IOException error;
        boolean running = true;

        private StreamReader(InputStream in, OutputStream out, int bufferSize, byte[] terminator) {
            this.in = in;
            this.out = out;
            this.bufferSize = bufferSize;
            this.terminator = terminator;
        }

        public void run() {

            try
            {
                running = true;
                byte[] buffer = new byte[bufferSize];

        		int count;
        		int terminatorIndex = 1;
        		boolean terminated = false;
        		boolean started = false;
        		long totalBytes = 0L;

        		while ((count = in.read(buffer, 0, buffer.length)) != -1 && running) {
                    // Notify stopped reading
                    synchronized (this)
                    {
                        notifyAll ();
                    }

                    //System.out.println ("Received " + new String(buffer));

                    // Check the terminator
                    if(terminator != null) {

                        if(!started) {
                            if (buffer[0] != '\r' && buffer[0] != '\n') {
                                started = true;
                            }
                        }

                        if(buffer[0] == terminator[terminatorIndex-1]) {

                            //System.out.println ("Incrementing terminatorIndex (" + terminatorIndex + ")");
                            terminatorIndex++;

                            if(terminatorIndex == terminator.length) {

                               // System.out.println ("Matched terminator.. breaking");
                                terminated = true;
                            }
                        }
                        else if(terminatorIndex == terminator.length) {

                            //System.out.println ("Matched terminator.. breaking");
                            terminated = true;
                        }
                        else {
                            terminatorIndex = 1;
                        }
                    }
                    else
                    {
                        started = true;
                    }

                    if(!terminated && started) {
                        out.write(buffer, 0, count);
                        
                        totalBytes += count;
                        
                        if(streamReaderListener != null) {
                            streamReaderListener.notifyBytesRead(totalBytes);
                        }
                    }
                    else
                    {
                        break;
                    }
        		}

        		out.flush();

            }
            catch (IOException e)
            {
                error = e;
            }
            finally {

                running = false;

                //  Notify exited
                synchronized (this)
                {
                    notifyAll ();
                }
            }
        }
    }


}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -