📄 remoteindexreader.java
字号:
package it.unimi.dsi.mg4j.index.remote;/* * MG4J: Managing Gigabytes for Java * * Copyright (C) 2006-2007 Sebastiano Vigna * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by the Free * Software Foundation; either version 2.1 of the License, or (at your option) * any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License * for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */import it.unimi.dsi.fastutil.ints.IntIterator;import it.unimi.dsi.fastutil.ints.IntIterators;import it.unimi.dsi.fastutil.ints.IntSet;import it.unimi.dsi.fastutil.objects.AbstractObjectIterator;import it.unimi.dsi.fastutil.objects.Reference2ReferenceMap;import it.unimi.dsi.fastutil.objects.Reference2ReferenceMaps;import it.unimi.dsi.fastutil.objects.ReferenceSet;import it.unimi.dsi.mg4j.index.AbstractIndexIterator;import it.unimi.dsi.mg4j.index.AbstractIndexReader;import it.unimi.dsi.mg4j.index.Index;import it.unimi.dsi.mg4j.index.IndexIterator;import it.unimi.dsi.mg4j.index.IndexReader;import it.unimi.dsi.mg4j.index.payload.Payload;import it.unimi.dsi.mg4j.search.IntervalIterator;import it.unimi.dsi.mg4j.search.IntervalIterators;import it.unimi.dsi.io.InputBitStream;import it.unimi.dsi.io.OutputBitStream;import it.unimi.dsi.util.Interval;import it.unimi.dsi.Util;import it.unimi.dsi.lang.MutableString;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.EOFException;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.Socket;import java.util.NoSuchElementException;import org.apache.log4j.Logger;/** An index reader for {@link it.unimi.dsi.mg4j.index.remote.RemoteIndex}. * * @author Sebastiano Vigna * @author Alessandro Arrabito */public class RemoteIndexReader extends AbstractIndexReader { final private static Logger LOGGER = Util.getLogger( RemoteIndexReader.class ); private static final boolean ASSERTS = false; private static final byte DOCUMENTS_BY_NAME = 0; private static final byte DOCUMENTS_BY_INDEX = 1; private static final byte PREFETCH = 2; private static final byte CLOSE = 3; private static final byte DISPOSE = 4; private static final byte SKIP_TO = 5; private static final byte SKIP = 6; /** The index we refer to. */ protected final Index index; /** The remote server connection used to call the index server.*/ protected final RemoteIndexServerConnection connection; /** The index iterator associated to this reader.*/ protected final RemoteIndexReaderIndexIterator remoteIndexIterator; /** The input stream in {@link #connection}, cached. */ protected final DataInputStream inputStream; /** The output stream in {@link #connection}, cached. */ protected final DataOutputStream outputStream; public RemoteIndexReader( final RemoteIndex index, final int bufferSize ) throws IOException { this.index = index; connection = new RemoteIndexServerConnection( index.socketAddress, IndexServer.GET_INDEX_READER ); inputStream = connection.inputStream; outputStream = connection.outputStream; remoteIndexIterator = new RemoteIndexReaderIndexIterator( bufferSize ); } public void close() throws IOException, IllegalStateException { outputStream.writeByte( RemoteIndexReader.CLOSE ); outputStream.flush(); try { connection.close(); } catch( IOException dontCare ) { // Whatever may happen, we're so outta here... } } protected void finalize() throws Throwable { try { if ( ! connection.socket.isClosed() ) { LOGGER.warn( "This " + this.getClass().getName() + " [" + toString() + "] should have been closed." ); close(); } } finally { super.finalize(); } } public IndexIterator documents( final int termNumber ) throws IOException { remoteIndexIterator.flush(); outputStream.writeByte( RemoteIndexReader.DOCUMENTS_BY_INDEX ); outputStream.writeInt( termNumber ); outputStream.flush(); remoteIndexIterator.term( null ); // Read frequency remoteIndexIterator.reset( inputStream.readInt() ); remoteIndexIterator.prefetchDocs( false ); return remoteIndexIterator; } public IndexIterator documents( final CharSequence term ) throws IOException { remoteIndexIterator.flush(); outputStream.writeByte( RemoteIndexReader.DOCUMENTS_BY_NAME ); new MutableString( term ).writeSelfDelimUTF8( (OutputStream)outputStream ); outputStream.flush(); remoteIndexIterator.term( term ); // Read frequency remoteIndexIterator.reset( inputStream.readInt() ); remoteIndexIterator.prefetchDocs( false ); return remoteIndexIterator; } /** An index iterator based on a remote index reader. * * <p>Each remote index reader creates exactly one instance of this class. The instance * is reused upon calls to {@link IndexReader#documents(int)}. * * <p>The internal state is unfortunately quite complicated by the necessity of grabbing data * from the socket as lazily as possible. * * <p>Basically, an instance of this class can be in one of three states: * <ul> * <li>If {@link #exhausted} is true, then there are no more items to be returned * <em>and</em> there is no more data coming from the socket. If you need to force * this state, you can call {@link #flush()}, which discards the remaining data * coming from the socket and set {@link #exhausted} (this is necessary, for instance, * each time you reuse the iterator). * <li>Otherwise, if {@link #last} is -1 the iterator is brand new, {@link #next} is -1, * too, and the socket input stream has been filled but never read. * <li>Otherwise, if {@link #next} is greater than or equal to zero, then {@link #next} * is the next document pointer to be returned, and it has been just read from the * socket input stream (for instance, if there are counts the socket input stream is * positioned just before the count). * <li>Finally, if {@link #next} is -1 then {@link #last} is the last document pointer * returned, and the socket input stream is positioned exactly before the next document * pointer to be returned, or over the end-of-block marker. * </ul> */ private class RemoteIndexReaderIndexIterator extends AbstractIndexIterator implements IndexIterator { /** The number of byte requested with a single request to an index server. */ private final int bufferSize; /** The next document pointer to be returned, or -1 if the iterator has to be advanced. */ private int next; /** The last document pointer returned. */ private int last; /** The frequency of the current term. */ private int frequency; /** Whether this iterator has been exhausted. */ private boolean exhausted; /** The current payload, or <code>null</code>. */ protected Payload payload; /** The current count. */ protected int count; /** The current positions. */ protected final int[] position; public RemoteIndexReaderIndexIterator ( final int bufferSize ) { this.bufferSize = bufferSize; this.position = new int[ index.maxCount ]; this.exhausted = true; // To avoid flushing the first time } public Index index() { return keyIndex; } public void flush() throws IOException { if ( ! exhausted ) { while( inputStream.readInt() >= 0 ); inputStream.readBoolean(); if ( ASSERTS ) assert inputStream.available() == 0; exhausted = true; } } public void reset( final int frequency ) { this.frequency = frequency; exhausted = false; next = last = -1; } /** Prefetches a batch of document data from the server. * @param alreadyOnFirst will be passed as an argument to the remote call. */ public void prefetchDocs( final boolean alreadyOnFirst ) throws IOException { outputStream.writeByte( RemoteIndexReader.PREFETCH ); outputStream.writeBoolean( alreadyOnFirst ); outputStream.writeInt( bufferSize ); outputStream.flush(); } /** Tries to advance the remote iterator. * * <p>After a call to this method returning -1, the prefetched data is exhausted * and {@link #exhausted} is true. Otherwise, the input stream of the connection is * positioned just before counts and position of the returned document pointer. * * @return -1 if there are no more elements, the next pointer otherwise. The value returned * in stored in {@link #next}. */ private int advance() { if ( next >= 0 ) return next; try { next = inputStream.readInt(); if ( next < 0 ) { if ( inputStream.readBoolean() ) prefetchDocs( false ); else { exhausted = true; return -1; } next = inputStream.readInt(); if ( ASSERTS ) assert next >= 0; } return next; } catch ( Exception e ) { throw new RuntimeException( e ); } } public int document() { if ( last < 0 ) throw new IllegalStateException(); return last; } public int skipTo( final int p ) { try { if ( p <= last ) return last; if ( exhausted ) return Integer.MAX_VALUE; // First we check whether we can skip inside the local buffer. if ( next < 0 ) next = inputStream.readInt(); while( next >= 0 && next < p ) { if ( index.hasCounts ) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -