📄 toethread.java
字号:
// Workaround to get cause from BDB if(e.getCause() == null) { e.initCause(e.getCause()); } recoverableProblem(e); } catch (AssertionError ae) { // This risks leaving crawl in fatally inconsistent state, // but is often reasonable for per-Processor assertion problems recoverableProblem(ae); } catch (RuntimeException e) { recoverableProblem(e); } catch (StackOverflowError err) { recoverableProblem(err); } catch (Error err) { // OutOfMemory and any others seriousError(err); } } /** * Handling for exceptions and errors that are possibly recoverable. * * @param e */ private void recoverableProblem(Throwable e) { Object previousStep = step; setStep(STEP_HANDLING_RUNTIME_EXCEPTION); e.printStackTrace(System.err); currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION); // store exception temporarily for logging currentCuri.addAnnotation("err="+e.getClass().getName()); currentCuri.putObject(A_RUNTIME_EXCEPTION, e); String message = "Problem " + e + " occured when trying to process '" + currentCuri.toString() + "' at step " + previousStep + " in " + currentProcessorName +"\n"; logger.log(Level.SEVERE, message.toString(), e); } private Processor getProcessor(Processor processor) { if(!(processor instanceof InstancePerThread)) { // just use the shared Processor return processor; } // must use local copy of processor Processor localProcessor = (Processor) localProcessors.get( processor.getClass().getName()); if (localProcessor == null) { localProcessor = processor.spawn(this.getSerialNumber()); localProcessors.put(processor.getClass().getName(),localProcessor); } return localProcessor; } /** * @return Return toe thread serial number. */ public int getSerialNumber() { return this.serialNumber; } /** * Used to get current threads HttpRecorder instance. * Implementation of the HttpRecorderMarker interface. * @return Returns instance of HttpRecorder carried by this thread. * @see org.archive.util.HttpRecorderMarker#getHttpRecorder() */ public HttpRecorder getHttpRecorder() { return this.httpRecorder; } /** Get the CrawlController acossiated with this thread. * * @return Returns the CrawlController. */ public CrawlController getController() { return controller; } /** * Terminates a thread. * * <p> Calling this method will ensure that the current thread will stop * processing as soon as possible (note: this may be never). Meant to * 'short circuit' hung threads. * * <p> Current crawl uri will have its fetch status set accordingly and * will be immediately returned to the frontier. * * <p> As noted before, this does not ensure that the thread will stop * running (ever). But once evoked it will not try and communicate with * other parts of crawler and will terminate as soon as control is * established. */ protected void kill(){ this.interrupt(); synchronized(this) { if (currentCuri!=null) { currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED); controller.getFrontier().finished(currentCuri); } } } /** * @return Current step (For debugging/reporting, give abstract step * where this thread is). */ public Object getStep() { return step; } /** * Is this thread processing a URI, not paused or waiting for a URI? * @return whether thread is actively processing a URI */ public boolean isActive() { // if alive and not waiting in/for frontier.next(), we're 'active' return this.isAlive() && (currentCuri != null); } /** * Request that this thread retire (exit cleanly) at the earliest * opportunity. */ public void retire() { shouldRetire = true; } /** * Whether this thread should cleanly retire at the earliest * opportunity. * * @return True if should retire. */ public boolean shouldRetire() { return shouldRetire; } // // Reporter implementation // /** * Compiles and returns a report on its status. * @param name Report name. * @param pw Where to print. */ public void reportTo(String name, PrintWriter pw) { // name is ignored for now: only one kind of report pw.print("["); pw.println(getName()); // Make a local copy of the currentCuri reference in case it gets // nulled while we're using it. We're doing this because // alternative is synchronizing and we don't want to do this -- // it causes hang ups as controller waits on a lock for this thread, // something it gets easily enough on old threading model but something // it can wait interminably for on NPTL threading model. // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM. CrawlURI c = currentCuri; if(c != null) { pw.print(" "); c.singleLineReportTo(pw); pw.print(" "); pw.print(c.getFetchAttempts()); pw.print(" attempts"); pw.println(); pw.print(" "); pw.print("in processor: "); pw.print(currentProcessorName); } else { pw.print(" -no CrawlURI- "); } pw.println(); long now = System.currentTimeMillis(); long time = 0; pw.print(" "); if(lastFinishTime > lastStartTime) { // That means we finished something after we last started something // or in other words we are not working on anything. pw.print("WAITING for "); time = now - lastFinishTime; } else if(lastStartTime > 0) { // We are working on something pw.print("ACTIVE for "); time = now-lastStartTime; } pw.print(ArchiveUtils.formatMillisecondsToConventional(time)); pw.println(); pw.print(" "); pw.print("step: "); pw.print(step); pw.print(" for "); pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince)); pw.println(); StackTraceElement[] ste = this.getStackTrace(); for(int i=0;i<ste.length;i++) { pw.print(" "); pw.print(ste[i].toString()); pw.println(); } pw.print("]"); pw.println(); pw.flush(); } /** * @param w PrintWriter to write to. */ public void singleLineReportTo(PrintWriter w) { w.print("#"); w.print(this.serialNumber); // Make a local copy of the currentCuri reference in case it gets // nulled while we're using it. We're doing this because // alternative is synchronizing and we don't want to do this -- // it causes hang ups as controller waits on a lock for this thread, // something it gets easily enough on old threading model but something // it can wait interminably for on NPTL threading model. // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM. CrawlURI c = currentCuri; if(c != null) { w.print(" "); w.print(currentProcessorName); w.print(" "); w.print(c.toString()); w.print(" ("); w.print(c.getFetchAttempts()); w.print(") "); } else { w.print(" [no CrawlURI] "); } long now = System.currentTimeMillis(); long time = 0; if(lastFinishTime > lastStartTime) { // That means we finished something after we last started something // or in other words we are not working on anything. w.print("WAITING for "); time = now - lastFinishTime; } else if(lastStartTime > 0) { // We are working on something w.print("ACTIVE for "); time = now-lastStartTime; } w.print(ArchiveUtils.formatMillisecondsToConventional(time)); w.print(" at "); w.print(step); w.print(" for "); w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince)); w.print("\n"); w.flush(); } /* (non-Javadoc) * @see org.archive.util.Reporter#singleLineLegend() */ public String singleLineLegend() { return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep"; } /* (non-Javadoc) * @see org.archive.util.Reporter#getReports() */ public String[] getReports() { // for now none but the default return new String[] {}; } public void reportTo(PrintWriter writer) { reportTo(null, writer); } /* (non-Javadoc) * @see org.archive.util.Reporter#singleLineReport() */ public String singleLineReport() { return ArchiveUtils.singleLineReport(this); } public void progressStatisticsLine(PrintWriter writer) { writer.print(getController().getStatistics() .getProgressStatisticsLine()); writer.print("\n"); } public void progressStatisticsLegend(PrintWriter writer) { writer.print(getController().getStatistics() .progressStatisticsLegend()); writer.print("\n"); } public String getCurrentProcessorName() { return currentProcessorName; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -