📄 remoteindexreader.java
字号:
count = inputStream.readInt(); if ( index.hasPositions ) for ( int i = 0; i < count; i++ ) inputStream.readInt(); } next = inputStream.readInt(); } //System.err.println( "Out of loop: " + next ); if ( next >= 0 ) return nextInt(); if ( exhausted = ! inputStream.readBoolean() ) return Integer.MAX_VALUE; if ( ASSERTS ) assert inputStream.available() == 0; int result; outputStream.writeByte( RemoteIndexReader.SKIP_TO ); outputStream.writeInt( p ); outputStream.flush(); result = inputStream.readInt(); //System.err.println( "Skip to " + p + " result: " + result ); if ( result == Integer.MAX_VALUE ) { exhausted = true; return Integer.MAX_VALUE; } prefetchDocs( true ); //System.err.println( "Prefetch completed" ); next = result; return nextInt(); } catch ( Exception e ) { throw new RuntimeException( e ); } } public boolean hasNext() { if ( exhausted ) return false; if ( next >= 0 ) return true; next = advance(); return ! exhausted; } public int nextInt() { if ( ! hasNext() ) throw new NoSuchElementException(); last = next; next = -1; try { // TODO: this is *very roughf* and preliminary if ( index.hasPayloads ) payload.read( new InputBitStream( inputStream, 0 ) ); if ( index.hasCounts ) { count = inputStream.readInt(); if ( index.hasPositions ) { for ( int i = 0; i < count; i++ ) position[ i ] = inputStream.readInt(); intervalIterator.reset(); } } } catch ( IOException e ) { throw new RuntimeException( e ); } return last; } void closeConnectionQuietly() { } // TODO: implement skip efficiently (also in bit stream readers) public void dispose() throws IOException { close(); } // TODO: implement efficiently public int nextDocument() { return hasNext() ? nextInt() : -1; } private final RemoteIndexIntervalIterator intervalIterator = index.hasPositions? new RemoteIndexIntervalIterator() : null; private final Index keyIndex = RemoteIndexReader.this.index.keyIndex; private final Reference2ReferenceMap<Index,IntervalIterator> singletonIntervalIterator = Reference2ReferenceMaps.singleton( keyIndex, (IntervalIterator)intervalIterator ); private class RemoteIndexIntervalIterator extends AbstractObjectIterator<Interval> implements IntervalIterator { private int pos; public Interval next() { if ( ! hasNext() ) throw new NoSuchElementException(); return Interval.valueOf( position[ pos++ ] ); } public void intervalTerms( final IntSet terms ) { throw new UnsupportedOperationException(); } public Interval nextInterval() { return pos < count ? Interval.valueOf( position[ pos++ ] ) : null; } public void reset() { pos = 0; } public int extent() { return 1; } public boolean hasNext() { return pos < count; } } public int frequency() { return frequency; } public Payload payload() { return payload; } public int count() { return count; } public IntIterator positions() { return IntIterators.wrap( position ); } public int positions( final int[] p ) { System.arraycopy( position, 0, p, 0, Math.min( count, p.length ) ); return p.length < position.length ? -count : count; } public int[] positionArray() { return position; } public ReferenceSet<Index> indices() { return index.singletonSet; } public IntervalIterator intervalIterator( final Index index ) { if ( index != keyIndex || ! index.hasPositions ) return IntervalIterators.TRUE; if ( ASSERTS ) assert intervalIterator != null; if ( ASSERTS ) assert count > 0; return count > 0 ? intervalIterator : IntervalIterators.FALSE; } public IntervalIterator intervalIterator() { return intervalIterator( keyIndex ); } public Reference2ReferenceMap<Index,IntervalIterator> intervalIterators() { return singletonIntervalIterator; } public int termNumber() { throw new UnsupportedOperationException(); } } public static class ServerThread extends it.unimi.dsi.mg4j.index.remote.ServerThread { @SuppressWarnings("hiding") private final static Logger LOGGER = Util.getLogger( ServerThread.class ); private final static boolean DEBUG = false; /** The index we refer to. */ private final Index index; /** The remoted index reader. */ private final IndexReader indexReader; /** The current index iterator . */ private IndexIterator indexIterator; public ServerThread( final Socket socket, final Index index ) throws IOException { super( socket ); this.index = index; this.indexReader = index.getReader(); } public void run() { try { int command; for(;;) { try { command = inputStream.readByte(); } catch ( IOException e ) { LOGGER.warn( "Socket has been probably closed", e ); return; } if ( DEBUG ) LOGGER.debug( "Received remote command: " + command ); switch ( command ) { case RemoteIndexReader.CLOSE: indexReader.close(); // We don't close the socket--the caller should return; case RemoteIndexReader.DOCUMENTS_BY_INDEX: indexIterator = indexReader.documents( inputStream.readInt() ); outputStream.writeInt( indexIterator.frequency() ); outputStream.flush(); break; case RemoteIndexReader.DOCUMENTS_BY_NAME: indexIterator = indexReader.documents( new MutableString().readSelfDelimUTF8( (InputStream)inputStream ) ); outputStream.writeInt( indexIterator.frequency() ); outputStream.flush(); break; case RemoteIndexReader.SKIP_TO: outputStream.writeInt( indexIterator.skipTo( inputStream.readInt() ) ); outputStream.flush(); break; case RemoteIndexReader.SKIP: outputStream.writeInt( indexIterator.skip( inputStream.readInt() ) ); outputStream.flush(); break; case RemoteIndexReader.PREFETCH: /* When alreadyOfFirst is true, the caller does not really want * to get the first document pointer, as it has it already * got somehow (e.g., by skipping). */ boolean alreadyOnFirst = inputStream.readBoolean(); int count, bufSize = inputStream.readInt(); int[] position; for ( int i = 0; ( indexIterator.hasNext() || alreadyOnFirst && i == 0 ) && bufSize > 0; i++ ) { if ( i > 0 || ! alreadyOnFirst ) { outputStream.writeInt( indexIterator.nextDocument() ); bufSize--; } if ( index.hasPayloads ) { // TODO: this is *very rough* & preliminary OutputBitStream obs = new OutputBitStream( outputStream ); index.payload.write( obs ); obs.flush(); } if ( index.hasCounts ) { outputStream.writeInt( count = indexIterator.count() ); bufSize--; if ( index.hasPositions ) { position = indexIterator.positionArray(); for( int p = 0; p < count; p++ ) outputStream.writeInt( position[ p ] ); bufSize -= count; } } } outputStream.writeInt( -1 ); // End marker outputStream.writeBoolean( indexIterator.hasNext() ); // A peek farther. outputStream.flush(); //System.err.println( "Prefetch completed" ); break; case RemoteIndexReader.DISPOSE: indexIterator.dispose(); // We don't close the socket--the caller should return; default: LOGGER.error( "Unknown remote command: " + command ); } } } catch ( EOFException e ) { LOGGER.warn( "The socket has been closed" ); } catch ( Exception e ) { LOGGER.fatal( e, e ); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -