📄 remoteinputstream.java
字号:
package it.unimi.dsi.mg4j.index.remote;/* * MG4J: Managing Gigabytes for Java * * Copyright (C) 2006-2007 Sebastiano Vigna * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by the Free * Software Foundation; either version 2.1 of the License, or (at your option) * any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License * for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */import it.unimi.dsi.fastutil.bytes.ByteArrays;import it.unimi.dsi.fastutil.io.FastBufferedInputStream;import it.unimi.dsi.fastutil.io.RepositionableStream;import it.unimi.dsi.Util;import java.io.EOFException;import java.io.IOException;import java.io.InputStream;import java.net.Socket;import java.net.SocketAddress;import org.apache.log4j.Logger;/** A client class that connects to an {@link it.unimi.dsi.mg4j.index.remote.IndexServer} * and exposes a remote {@link java.io.InputStream} locally. * * @author Alessandro Arrabito */public class RemoteInputStream extends InputStream implements RepositionableStream { private static final Logger LOGGER = Util.getLogger( RemoteInputStream.class ); private static final byte READ_ARRAY = 0; private static final byte READ_BYTE = 1; private static final byte SET_POSITION = 2; private static final byte READ_POSITION = 3; private static final byte AVAILABLE = 4; private static final byte SKIP = 5; private static final byte CLOSE = 6; /** The connection to the server. */ final private RemoteIndexServerConnection connection; /** Creates a new client input stream using a given socket address. * @param address the address of the index server. */ public RemoteInputStream( final SocketAddress address ) throws IOException { connection = new RemoteIndexServerConnection( address, IndexServer.GET_CLIENT_INPUT_STREAM ); } public int available() throws IOException { connection.outputStream.writeByte( RemoteInputStream.AVAILABLE ); connection.outputStream.flush(); return connection.inputStream.readInt(); } public void close() throws IOException { connection.outputStream.writeByte( RemoteInputStream.CLOSE ); connection.outputStream.flush(); try { connection.close(); } catch( IOException dontCare ) { // Whatever may happen, we're so outta here... } } public int read( final byte[] array, final int offset, final int length ) throws IOException { if ( length == 0 ) return 0; ByteArrays.ensureOffsetLength( array, offset, length ); connection.outputStream.writeByte( RemoteInputStream.READ_ARRAY ); connection.outputStream.writeInt( length ); connection.outputStream.flush(); final int result = connection.inputStream.readInt(); if ( result <= 0 ) return result; connection.inputStream.read( array, offset, result ); return result; } public long skip( final long toSkip ) throws IOException { if ( toSkip < 0 ) throw new IOException( "Negative skip: " + toSkip ); connection.outputStream.writeByte( RemoteInputStream.SKIP ); connection.outputStream.writeLong( toSkip ); connection.outputStream.flush(); return connection.inputStream.readLong(); } public int read() throws IOException { connection.outputStream.writeByte( RemoteInputStream.READ_BYTE ); connection.outputStream.flush(); return connection.inputStream.readInt(); } public void position( final long newPosition ) throws IOException { connection.outputStream.writeByte( RemoteInputStream.SET_POSITION ); connection.outputStream.writeLong( newPosition ); connection.outputStream.flush(); } public long position() throws IOException { connection.outputStream.writeByte( RemoteInputStream.SET_POSITION ); connection.outputStream.flush(); return connection.inputStream.readLong(); } public static class ServerThread extends it.unimi.dsi.mg4j.index.remote.ServerThread { private static final boolean DEBUG = false; /** The remoted input stream. */ private final FastBufferedInputStream remotedInputStream; //private final FileInputStream remotedInputStream; public ServerThread( final Socket socket, final InputStream stream ) throws IOException { super( socket ); //this.remotedInputStream = (FileInputStream)stream; // TODO: which buffer size? this.remotedInputStream = new FastBufferedInputStream( stream ); } public void run() { try { int command; byte[] readBuf = ByteArrays.EMPTY_ARRAY; for ( ;; ) { command = inputStream.readByte(); if ( DEBUG ) LOGGER.debug( "Received remote command: " + command ); switch ( command ) { case RemoteInputStream.READ_ARRAY: // TODO: avoid reallocating the buffer int len = inputStream.readInt(); if ( readBuf.length < len ) readBuf = new byte[ len ]; int result = remotedInputStream.read( readBuf, 0, len ); outputStream.writeInt( result ); if ( result > 0 ) outputStream.write( readBuf, 0, result ); outputStream.flush(); break; case RemoteInputStream.READ_BYTE: outputStream.writeInt( remotedInputStream.read() ); outputStream.flush(); break; case RemoteInputStream.SET_POSITION: //remotedInputStream.getChannel().position( inputStream.readLong() ); remotedInputStream.position( inputStream.readLong() ); break; case RemoteInputStream.READ_POSITION: //outputStream.writeLong( remotedInputStream.getChannel().position() ); outputStream.writeLong( remotedInputStream.position() ); outputStream.flush(); break; case RemoteInputStream.AVAILABLE: outputStream.writeLong( remotedInputStream.available() ); outputStream.flush(); break; case RemoteInputStream.SKIP: outputStream.writeLong( remotedInputStream.skip( inputStream.readLong() ) ); outputStream.flush(); break; case RemoteInputStream.CLOSE: return; default: LOGGER.error( "Unknown remote command: " + command ); } } } catch ( EOFException e ) { LOGGER.warn( "The socket has been closed" ); } catch ( Exception e ) { LOGGER.fatal( e, e ); } finally { try { remotedInputStream.close(); // We don't close the socket--the caller should } catch ( IOException e ) {} } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -