📄 scan.java
字号:
final PayloadAccumulator[] accumulator = new PayloadAccumulator[ numberOfIndexedFields ]; // To accumulate // document data final ProgressLogger pl = new ProgressLogger( LOGGER, logInterval, "documents" ); if ( documentSequence instanceof DocumentCollection ) pl.expectedUpdates = ( (DocumentCollection)documentSequence ).size(); final boolean zipping = zipCollectionBasename != null; final ZipDocumentCollectionBuilder builder = zipping ? new ZipDocumentCollectionBuilder( zipCollectionBasename + ".zip", documentSequence.factory(), true, pl ) : null; for ( int i = 0; i < numberOfIndexedFields; i++ ) { switch ( factory.fieldType( indexedField[ i ] ) ) { case TEXT: scan[ i ] = new Scan( basename + '-' + factory.fieldName( indexedField[ i ] ), factory.fieldName( indexedField[ i ] ), termProcessor, map != null ? IndexingType.REMAPPED : IndexingType.STANDARD, 0, 0, bufferSize, builder, tempDir ); break; case VIRTUAL: scan[ i ] = new Scan( basename + '-' + factory.fieldName( indexedField[ i ] ), factory.fieldName( indexedField[ i ] ), termProcessor, IndexingType.VIRTUAL, virtualDocumentResolver[ i ].numberOfDocuments(), virtualGap[ i ], bufferSize, builder, tempDir ); break; case DATE: accumulator[ i ] = new PayloadAccumulator( basename + '-' + factory.fieldName( indexedField[ i ] ), new DatePayload(), factory.fieldName( indexedField[ i ] ), map != null ? IndexingType.REMAPPED : IndexingType.STANDARD, documentsPerBatch, tempDir ); break; case INT: accumulator[ i ] = new PayloadAccumulator( basename + '-' + factory.fieldName( indexedField[ i ] ), new IntegerPayload(), factory.fieldName( indexedField[ i ] ), map != null ? IndexingType.REMAPPED : IndexingType.STANDARD, documentsPerBatch, tempDir ); break; default: } } pl.displayFreeMemory = true; pl.start( "Indexing documents..." ); DocumentIterator iterator = documentSequence.iterator(); Reader reader; WordReader wordReader; ObjectList<VirtualDocumentFragment> fragments; Document document; int documentPointer = 0, documentsInBatch = 0; long batchStartTime = System.currentTimeMillis(); boolean outOfMemoryError = false, stopCompaction = false; while ( ( document = iterator.nextDocument() ) != null ) { if ( zipping ) builder.startDocument( document.title(), document.uri() ); for ( int i = 0; i < numberOfIndexedFields; i++ ) { switch ( factory.fieldType( indexedField[ i ] ) ) { case TEXT: reader = (Reader)document.content( indexedField[ i ] ); wordReader = document.wordReader( indexedField[ i ] ); wordReader.setReader( reader ); if ( zipping ) builder.startTextField(); scan[ i ].processDocument( map != null ? map[ documentPointer ] : documentPointer, wordReader ); if ( zipping ) builder.endTextField(); break; case VIRTUAL: fragments = (ObjectList<VirtualDocumentFragment>)document.content( indexedField[ i ] ); wordReader = document.wordReader( indexedField[ i ] ); virtualDocumentResolver[ i ].context( document ); for( VirtualDocumentFragment fragment: fragments ) { int virtualDocumentPointer = virtualDocumentResolver[ i ].resolve( fragment.documentSpecifier() ); if ( virtualDocumentPointer < 0 ) continue; if ( map != null ) virtualDocumentPointer = map[ virtualDocumentPointer ]; wordReader.setReader( new FastBufferedReader( fragment.text() ) ); scan[ i ].processDocument( virtualDocumentPointer, wordReader ); } if ( zipping ) builder.virtualField( fragments ); break; default: Object o = document.content( indexedField[ i ] ); accumulator[ i ].processData( map != null ? map[ documentPointer ] : documentPointer, o ); if ( zipping ) builder.nonTextField( o ); break; } if ( scan[ i ] != null && scan[ i ].outOfMemoryError ) stopCompaction = outOfMemoryError = true; } if ( zipping ) builder.endDocument(); documentPointer++; documentsInBatch++; document.close(); pl.update(); // We try compaction until (after compaction) we have less than 20% memory available long percAvailableMemory = Util.percAvailableMemory(); if ( percAvailableMemory < 10 && !stopCompaction ) { LOGGER.info( "Trying compaction... (" + percAvailableMemory + "% available)" ); Util.compactMemory(); percAvailableMemory = Util.percAvailableMemory(); if ( percAvailableMemory < 20 ) stopCompaction = true; LOGGER.info( "Compaction completed (" + percAvailableMemory + "% available" + ( stopCompaction ? ")" : ", will try again)" ) ); } if ( outOfMemoryError || documentsInBatch == documentsPerBatch || percAvailableMemory < 10 ) { if ( outOfMemoryError ) LOGGER.warn( "OutOfMemoryError during buffer reallocation: writing a batch of " + documentsInBatch + " documents" ); else if ( percAvailableMemory < 10 ) LOGGER.warn( "Available memory below 10%: writing a batch of " + documentsInBatch + " documents" ); long occurrences = 0; for ( int i = 0; i < numberOfIndexedFields; i++ ) { switch ( factory.fieldType( indexedField[ i ] ) ) { case TEXT: case VIRTUAL: occurrences += scan[ i ].dumpBatch(); scan[ i ].openSizeBitStream(); break; default: accumulator[ i ].writeData(); } } LOGGER.info( "Last set of batches indexed at " + Util.format( ( 1000. * occurrences ) / ( System.currentTimeMillis() - batchStartTime ) ) + " occurrences/s" ); batchStartTime = System.currentTimeMillis(); documentsInBatch = 0; stopCompaction = outOfMemoryError = false; } } iterator.close(); if ( builder != null ) BinIO.storeObject( builder.close(), zipCollectionBasename + DocumentCollection.DEFAULT_EXTENSION ); for ( int i = 0; i < numberOfIndexedFields; i++ ) { switch ( factory.fieldType( indexedField[ i ] ) ) { case TEXT: case VIRTUAL: scan[ i ].close(); break; default: accumulator[ i ].close(); break; } } pl.done(); if ( numDocuments > 0 && documentPointer != numDocuments ) LOGGER.error( "The document sequence contains " + documentPointer + " documents, but the ZerothPass property file claims that there are " + numDocuments + " documents" ); if ( map != null && documentPointer != map.length ) LOGGER.warn( "The document sequence contains " + documentPointer + " documents, but the map contains " + map.length + " integers" ); } final MutableString word = new MutableString(); final MutableString nonWord = new MutableString(); /** The default delimiter separating two documents read from standard input (a newline). */ public static final int DEFAULT_DELIMITER = 10; /** The default batch size. */ public static final int DEFAULT_BATCH_SIZE = 100000; /** The default buffer size. */ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; /** The default virtual field gap. */ public static final int DEFAULT_VIRTUAL_DOCUMENT_GAP = 64; /** * Processes a document. * * @param documentPointer the integer pointer associated to the document. * @param wordReader the word reader associated to the document. */ public void processDocument( final int documentPointer, final WordReader wordReader ) throws IOException { int pos = indexingIsVirtual ? currMaxPos[ documentPointer ] : 0; final int actualPointer = indexingIsStandard ? documentCount : documentPointer; ByteArrayPostingList termBaobs; word.length( 0 ); nonWord.length( 0 ); while ( wordReader.next( word, nonWord ) ) { if ( builder != null ) builder.add( word, nonWord ); if ( word.length() == 0 ) continue; if ( !termProcessor.processTerm( word ) ) { pos++; // We do consider the positions of terms canceled out by the term processor. continue; } // We check whether we have already seen this term. If not, we add it to the term map. if ( ( termBaobs = termMap.get( word ) ) == null ) { termMap.put( word.copy(), termBaobs = new ByteArrayPostingList( new byte[ 32 ], indexingIsStandard ) ); numTerms++; if ( numTerms % TERM_REPORT_STEP == 0 ) LOGGER.info( "[" + Util.format( numTerms ) + " term(s)]" ); } // We now record the occurrence. If a renumbering map has // been specified, we have to renumber the document index through it. termBaobs.setDocumentPointer( actualPointer ); termBaobs.addPosition( pos ); // Record whether this posting list has an out-of-memory-error problem. if ( termBaobs.outOfMemoryError ) outOfMemoryError = true; occsInCurrDoc++; numOccurrences++; pos++; } if ( pos > maxDocSize ) maxDocSize = pos; if ( indexingIsStandard ) sizes.writeGamma( pos ); else if ( indexingIsRemapped ) { sizes.writeGamma( actualPointer ); sizes.writeGamma( pos ); } if ( indexingIsVirtual ) currMaxPos[ documentPointer ] += occsInCurrDoc + virtualDocumentGap; pos = occsInCurrDoc = 0; documentCount++; if ( actualPointer > maxDocInBatch ) maxDocInBatch = actualPointer; } private static void makeEmpty( final String filename ) throws IOException { final File file = new File( filename ); if ( file.exists() && !file.delete() ) throw new IOException( "Cannot delete file " + file ); file.createNewFile(); } /** * Closes this pass, releasing all resources. */ public void close() throws ConfigurationException, IOException { if ( numOccurrences > 0 ) dumpBatch(); if ( numOccurrences == 0 && batch == 0 ) { // Special case: no term has been indexed. We generate an empty batch. final String batchBasename = batchBasename( 0, basename, batchDir ); LOGGER.debug( "Generating empty index " + batchBasename ); makeEmpty( batchBasename + DiskBasedIndex.TERMS_EXTENSION ); makeEmpty( batchBasename + DiskBasedIndex.FREQUENCIES_EXTENSION ); makeEmpty( batchBasename + DiskBasedIndex.GLOBCOUNTS_EXTENSION ); makeEmpty( batchBasename + DiskBasedIndex.SIZES_EXTENSION ); final IndexWriter indexWriter = new BitStreamIndexWriter( batchBasename, totDocuments, true, flags ); indexWriter.close(); final Properties properties = indexWriter.properties(); properties.setProperty( Index.PropertyKeys.TERMPROCESSOR, ObjectParser.toSpec( termProcessor ) ); properties.setProperty( Index.PropertyKeys.OCCURRENCES, 0 ); properties.setProperty( Index.PropertyKeys.MAXCOUNT, 0 ); properties.setProperty( Index.PropertyKeys.MAXDOCSIZE, maxDocSize ); properties.setProperty( Index.PropertyKeys.SIZE, 0 ); if ( field != null ) properties.setProperty( Index.PropertyKeys.FIELD, field ); properties.save( batchBasename + DiskBasedIndex.PROPERTIES_EXTENSION ); batch = 1; } termMap = null; final Properties properties = new Properties(); if ( field != null ) properties.setProperty( Index.PropertyKeys.FIELD, field ); properties.setProperty( Index.PropertyKeys.BATCHES, batch ); properties.setProperty( Index.PropertyKeys.DOCUMENTS, totDocuments ); properties.setProperty( Index.PropertyKeys.MAXDOCSIZE, globMaxDocSize ); properties.setProperty( Index.PropertyKeys.MAXCOUNT, maxCount ); properties.setProperty( Index.PropertyKeys.OCCURRENCES, totOccurrences ); properties.setProperty( Index.PropertyKeys.POSTINGS, totPostings ); properties.setProperty( Index.PropertyKeys.TERMPROCESSOR, termProcessor.getClass().getName() ); if ( ! indexingIsVirtual ) { // This set of batches can be seen as a documental cluster index. final Properties clusterProperties = new Properties(); clusterProperties.addAll( properties ); clusterProperties.setProperty( Index.PropertyKeys.TERMS, -1 ); clusterProperties.setProperty( DocumentalCluster.PropertyKeys.BLOOM, false ); clusterProperties.setProperty( IndexCluster.PropertyKeys.FLAT, false ); if ( indexingIsStandard ) { clusterProperties.setProperty( Index.PropertyKeys.INDEXCLASS, DocumentalConcatenatedCluster.class.getName() ); BinIO.storeObject( new ContiguousDocumentalStrategy( cutPoints.toIntArray() ), basename + CLUSTER_STRATEGY_EXTENSION ); } else { // Remapped clusterProperties.setProperty( Index.PropertyKeys.INDEXCLASS, DocumentalMergedCluster.class.getName() ); BinIO.storeObject( new IdentityDocumentalStrategy( batch, totDocuments ), basename + CLUSTER_STRATEGY_EXTENSION ); } clusterProperties.setProperty( IndexCluster.PropertyKeys.STRATEGY, basename + CLUSTER_STRATEGY_EXTENSION ); for ( int i = 0; i < batch; i++ ) clusterProperties.addProperty( IndexCluster.PropertyKeys.LOCALINDEX, batchBasename( i, basename, batchDir ) ); clusterProperties.save( basename + CLUSTER_PROPERTIES_EXTENSION ); } properties.save( basename + DiskBasedIndex.PROPERTIES_EXTENSION ); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -