📄 nonblockingstreamreader.java
字号:
/*
* @(#)NonBlockingStreamReader.java 31/10/2004
*
* Copyright (c) 2004, 2005 jASEN.org
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
*
* 3. The names of the authors may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* 4. Any modification or additions to the software must be contributed back
* to the project.
*
* 5. Any investigation or reverse engineering of source code or binary to
* enable emails to bypass the filters, and hence inflict spam and or viruses
* onto users who use or do not use jASEN could subject the perpetrator to
* criminal and or civil liability.
*
* THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JASEN.ORG,
* OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package org.jasen.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* <P>
* Implements a threaded input stream reader which can be terminated if it blocks for too long.
* </P>
* @author Jason Polites
*/
public class NonBlockingStreamReader
{
private StreamReaderListener streamReaderListener;
/**
*
*/
public NonBlockingStreamReader() {
super ();
}
public NonBlockingStreamReader(StreamReaderListener streamReaderListener) {
super ();
this.streamReaderListener = streamReaderListener;
}
/**
* Reads from the specified stream with the give buffer size and writes to the specified output stream.
* <P>
* If the reader blocks on a read() call for more than timeout milliseconds, the read process is abnormally aborted and an IOException is thrown
* </P>
* @param in The inputstream from which to read
* @param out The outputstream to which to write
* @param bufferSize The size of the buffer (in bytes) to use when reading/writing
* @param timeout The timeout (in milliseconds) to allow the read to block until bytes are available from the stream
* @throws IOException If an error occurred while reading the stream, or the read timed out
*/
public void read(InputStream in, OutputStream out, int bufferSize, long timeout, byte[] terminator) throws IOException {
if(bufferSize > 1 && terminator != null) {
throw new NonBlockingIOException("Cannot specify line terminator with > 1 length buffer", NonBlockingIOException.INVALID_BUFFER);
}
// Create a new thread reader
StreamReader thread = new StreamReader (in, out, bufferSize, terminator);
thread.start ();
long time = 0L;
while(thread.running) {
time = System.currentTimeMillis();
synchronized (thread) {
try
{
thread.wait(timeout);
if((System.currentTimeMillis() - time) >= timeout) {
if(thread.isAlive()) {
thread.interrupt();
thread.running = false;
thread = null;
}
throw new NonBlockingIOException("Timeout waiting for bytes", NonBlockingIOException.TIMEOUT);
}
else if (thread.error != null) {
throw thread.error;
}
}
catch (InterruptedException ignore) {}
}
}
}
protected class StreamReader extends Thread
{
private InputStream in;
private OutputStream out;
private int bufferSize;
private byte[] terminator;
IOException error;
boolean running = true;
private StreamReader(InputStream in, OutputStream out, int bufferSize, byte[] terminator) {
this.in = in;
this.out = out;
this.bufferSize = bufferSize;
this.terminator = terminator;
}
public void run() {
try
{
running = true;
byte[] buffer = new byte[bufferSize];
int count;
int terminatorIndex = 1;
boolean terminated = false;
boolean started = false;
long totalBytes = 0L;
while ((count = in.read(buffer, 0, buffer.length)) != -1 && running) {
// Notify stopped reading
synchronized (this)
{
notifyAll ();
}
//System.out.println ("Received " + new String(buffer));
// Check the terminator
if(terminator != null) {
if(!started) {
if (buffer[0] != '\r' && buffer[0] != '\n') {
started = true;
}
}
if(buffer[0] == terminator[terminatorIndex-1]) {
//System.out.println ("Incrementing terminatorIndex (" + terminatorIndex + ")");
terminatorIndex++;
if(terminatorIndex == terminator.length) {
// System.out.println ("Matched terminator.. breaking");
terminated = true;
}
}
else if(terminatorIndex == terminator.length) {
//System.out.println ("Matched terminator.. breaking");
terminated = true;
}
else {
terminatorIndex = 1;
}
}
else
{
started = true;
}
if(!terminated && started) {
out.write(buffer, 0, count);
totalBytes += count;
if(streamReaderListener != null) {
streamReaderListener.notifyBytesRead(totalBytes);
}
}
else
{
break;
}
}
out.flush();
}
catch (IOException e)
{
error = e;
}
finally {
running = false;
// Notify exited
synchronized (this)
{
notifyAll ();
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -