📄 concurrentmergescheduler.java
字号:
package org.apache.lucene.index;/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import org.apache.lucene.store.Directory;import java.io.IOException;import java.util.List;import java.util.ArrayList;/** A {@link MergeScheduler} that runs each merge using a * separate thread, up until a maximum number of threads * ({@link #setMaxThreadCount}) at which points merges are * run in the foreground, serially. This is a simple way * to use concurrency in the indexing process without * having to create and manage application level * threads. */public class ConcurrentMergeScheduler extends MergeScheduler { private int mergeThreadPriority = -1; private List mergeThreads = new ArrayList(); private int maxThreadCount = 3; private List exceptions = new ArrayList(); private Directory dir; private boolean closed; private IndexWriter writer; public ConcurrentMergeScheduler() { if (allInstances != null) { // Only for testing addMyself(); } } /** Sets the max # simultaneous threads that may be * running. If a merge is necessary yet we already have * this many threads running, the merge is returned back * to IndexWriter so that it runs in the "foreground". */ public void setMaxThreadCount(int count) { if (count < 1) throw new IllegalArgumentException("count should be at least 1"); maxThreadCount = count; } /** Get the max # simultaneous threads that may be * running. @see #setMaxThreadCount. */ public int getMaxThreadCount() { return maxThreadCount; } /** Return the priority that merge threads run at. By * default the priority is 1 plus the priority of (ie, * slightly higher priority than) the first thread that * calls merge. */ public synchronized int getMergeThreadPriority() { initMergeThreadPriority(); return mergeThreadPriority; } /** Return the priority that merge threads run at. */ public synchronized void setMergeThreadPriority(int pri) { if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive"); mergeThreadPriority = pri; final int numThreads = mergeThreadCount(); for(int i=0;i<numThreads;i++) { MergeThread merge = (MergeThread) mergeThreads.get(i); merge.setThreadPriority(pri); } } private void message(String message) { if (writer != null) writer.message("CMS: " + message); } private synchronized void initMergeThreadPriority() { if (mergeThreadPriority == -1) { // Default to slightly higher priority than our // calling thread mergeThreadPriority = 1+Thread.currentThread().getPriority(); if (mergeThreadPriority > Thread.MAX_PRIORITY) mergeThreadPriority = Thread.MAX_PRIORITY; } } public void close() { closed = true; } public synchronized void sync() { while(mergeThreadCount() > 0) { message("now wait for threads; currently " + mergeThreads.size() + " still running"); final int count = mergeThreads.size(); for(int i=0;i<count;i++) message(" " + i + ": " + ((MergeThread) mergeThreads.get(i))); try { wait(); } catch (InterruptedException e) { } } } private synchronized int mergeThreadCount() { int count = 0; final int numThreads = mergeThreads.size(); for(int i=0;i<numThreads;i++) if (((MergeThread) mergeThreads.get(i)).isAlive()) count++; return count; } public void merge(IndexWriter writer) throws CorruptIndexException, IOException { this.writer = writer; initMergeThreadPriority(); dir = writer.getDirectory(); // First, quickly run through the newly proposed merges // and add any orthogonal merges (ie a merge not // involving segments already pending to be merged) to // the queue. If we are way behind on merging, many of // these newly proposed merges will likely already be // registered. message("now merge"); message(" index: " + writer.segString()); // Iterate, pulling from the IndexWriter's queue of // pending merges, until its empty: while(true) { // TODO: we could be careful about which merges to do in // the BG (eg maybe the "biggest" ones) vs FG, which // merges to do first (the easiest ones?), etc. MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { message(" no more merges pending; now return"); return; } // We do this w/ the primary thread to keep // deterministic assignment of segment names writer.mergeInit(merge); message(" consider merge " + merge.segString(dir)); if (merge.isExternal) { message(" merge involves segments from an external directory; now run in foreground"); } else { synchronized(this) { if (mergeThreadCount() < maxThreadCount) { // OK to spawn a new merge thread to handle this // merge: MergeThread merger = new MergeThread(writer, merge); mergeThreads.add(merger); message(" launch new thread [" + merger.getName() + "]"); merger.setThreadPriority(mergeThreadPriority); merger.setDaemon(true); merger.start(); continue; } else message(" too many merge threads running; run merge in foreground"); } } // Too many merge threads already running, so we do // this in the foreground of the calling thread writer.merge(merge); } } private class MergeThread extends Thread { IndexWriter writer; MergePolicy.OneMerge startMerge; MergePolicy.OneMerge runningMerge; public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException { this.writer = writer; this.startMerge = startMerge; } public synchronized void setRunningMerge(MergePolicy.OneMerge merge) { runningMerge = merge; } public synchronized MergePolicy.OneMerge getRunningMerge() { return runningMerge; } public void setThreadPriority(int pri) { try { setPriority(pri); } catch (NullPointerException npe) { // Strangely, Sun's JDK 1.5 on Linux sometimes // throws NPE out of here... } catch (SecurityException se) { // Ignore this because we will still run fine with // normal thread priority } } public void run() { // First time through the while loop we do the merge // that we were started with: MergePolicy.OneMerge merge = this.startMerge; try { message(" merge thread: start"); while(true) { setRunningMerge(merge); writer.merge(merge); // Subsequent times through the loop we do any new // merge that writer says is necessary: merge = writer.getNextMerge(); if (merge != null) { writer.mergeInit(merge); message(" merge thread: do another merge " + merge.segString(dir)); } else break; } message(" merge thread: done"); } catch (Throwable exc) { if (merge != null) { merge.setException(exc); writer.addMergeException(merge); } // Ignore the exception if it was due to abort: if (!(exc instanceof MergePolicy.MergeAbortedException)) { synchronized(ConcurrentMergeScheduler.this) { exceptions.add(exc); } if (!suppressExceptions) { // suppressExceptions is normally only set during // testing. anyExceptions = true; throw new MergePolicy.MergeException(exc); } } } finally { synchronized(ConcurrentMergeScheduler.this) { mergeThreads.remove(this); ConcurrentMergeScheduler.this.notifyAll(); } } } public String toString() { MergePolicy.OneMerge merge = getRunningMerge(); if (merge == null) merge = startMerge; return "merge thread: " + merge.segString(dir); } } static boolean anyExceptions = false; /** Used for testing */ public static boolean anyUnhandledExceptions() { synchronized(allInstances) { final int count = allInstances.size(); // Make sure all outstanding threads are done so we see // any exceptions they may produce: for(int i=0;i<count;i++) ((ConcurrentMergeScheduler) allInstances.get(i)).sync(); return anyExceptions; } } /** Used for testing */ private void addMyself() { synchronized(allInstances) { final int size=0; int upto = 0; for(int i=0;i<size;i++) { final ConcurrentMergeScheduler other = (ConcurrentMergeScheduler) allInstances.get(i); if (!(other.closed && 0 == other.mergeThreadCount())) // Keep this one for now: it still has threads or // may spawn new threads allInstances.set(upto++, other); } allInstances.subList(upto, allInstances.size()).clear(); allInstances.add(this); } } private boolean suppressExceptions; /** Used for testing */ void setSuppressExceptions() { suppressExceptions = true; } /** Used for testing */ void clearSuppressExceptions() { suppressExceptions = false; } /** Used for testing */ private static List allInstances; public static void setTestMode() { allInstances = new ArrayList(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -