📄 recoveryjournal.java
字号:
e.printStackTrace(); } } /** * Import just the SUCCESS (and possibly FAILURE) URIs from the given * recovery log into the frontier as considered included. * * @param source recovery log file to use * @param frontier frontier to update * @param retainFailures whether failure ('Ff') URIs should count as done * @return number of lines in recovery log (for reference) * @throws IOException */ private static int importCompletionInfoFromLog(File source, Frontier frontier, boolean retainFailures) throws IOException { // Scan log for all 'Fs' lines: add as 'alreadyIncluded' BufferedInputStream is = getBufferedInput(source); // create MutableString of good starting size (will grow if necessary) MutableString read = new MutableString(UURI.MAX_URL_LENGTH); int lines = 0; try { while (readLine(is,read)) { lines++; boolean wasSuccess = read.startsWith(F_SUCCESS); if (wasSuccess || (retainFailures && read.startsWith(F_FAILURE))) { // retrieve first (only) URL on line String s = read.subSequence(3,read.length()).toString(); try { UURI u = UURIFactory.getInstance(s); frontier.considerIncluded(u); if(wasSuccess) { if (frontier.getFrontierJournal() != null) { frontier.getFrontierJournal(). finishedSuccess(u); } } else { // carryforward failure, in case future recovery // wants to no retain them as finished if (frontier.getFrontierJournal() != null) { frontier.getFrontierJournal(). finishedFailure(u); } } } catch (URIException e) { e.printStackTrace(); } } if((lines%PROGRESS_INTERVAL)==0) { // every 1 million lines, print progress LOGGER.info( "at line " + lines + " alreadyIncluded count = " + frontier.discoveredUriCount()); } } } catch (EOFException e) { // expected in some uncleanly-closed recovery logs; ignore } finally { is.close(); } return lines; } /** * Read a line from the given bufferedinputstream into the MutableString. * Return true if a line was read; false if EOF. * * @param is * @param read * @return True if we read a line. * @throws IOException */ private static boolean readLine(BufferedInputStream is, MutableString read) throws IOException { read.length(0); int c = is.read(); while((c!=-1)&&c!='\n'&&c!='\r') { read.append((char)c); c = is.read(); } if(c==-1 && read.length()==0) { // EOF and none read; return false return false; } if(c=='\n') { // consume LF following CR, if present is.mark(1); if(is.read()!='\r') { is.reset(); } } // a line (possibly blank) was read return true; } /** * Import all ADDs from given recovery log into the frontier's queues * (excepting those the frontier drops as already having been included) * * @param source recovery log file to use * @param frontier frontier to update * @param lines total lines noted in recovery log earlier * @param enough latch signalling 'enough' URIs queued to begin crawling */ private static void importQueuesFromLog(File source, Frontier frontier, int lines, CountDownLatch enough) { BufferedInputStream is; // create MutableString of good starting size (will grow if necessary) MutableString read = new MutableString(UURI.MAX_URL_LENGTH); long queuedAtStart = frontier.queuedUriCount(); long queuedDuringRecovery = 0; int qLines = 0; try { // Scan log for all 'F+' lines: if not alreadyIncluded, schedule for // visitation is = getBufferedInput(source); try { while (readLine(is,read)) { qLines++; if (read.startsWith(F_ADD)) { UURI u; CharSequence args[] = splitOnSpaceRuns(read); try { u = UURIFactory.getInstance(args[1].toString()); String pathFromSeed = (args.length > 2)? args[2].toString() : ""; UURI via = (args.length > 3)? UURIFactory.getInstance(args[3].toString()): null; String viaContext = (args.length > 4)? args[4].toString(): ""; CandidateURI caUri = new CandidateURI(u, pathFromSeed, via, viaContext); frontier.schedule(caUri); queuedDuringRecovery = frontier.queuedUriCount() - queuedAtStart; if(((queuedDuringRecovery + 1) % ENOUGH_TO_START_CRAWLING) == 0) { enough.countDown(); } } catch (URIException e) { e.printStackTrace(); } } if((qLines%PROGRESS_INTERVAL)==0) { // every 1 million lines, print progress LOGGER.info( "through line " + qLines + "/" + lines + " queued count = " + frontier.queuedUriCount()); } } } catch (EOFException e) { // no problem: untidy end of recovery journal } finally { is.close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } LOGGER.info("finished recovering frontier from "+source+" " +qLines+" lines processed"); enough.countDown(); } /** * Return an array of the subsequences of the passed-in sequence, * split on space runs. * * @param read * @return CharSequence. */ private static CharSequence[] splitOnSpaceRuns(CharSequence read) { int lastStart = 0; ArrayList segs = new ArrayList(5); int i; for(i=0;i<read.length();i++) { if (read.charAt(i)==' ') { segs.add(read.subSequence(lastStart,i)); i++; while(i < read.length() && read.charAt(i)==' ') { // skip any space runs i++; } lastStart = i; } } if(lastStart<read.length()) { segs.add(read.subSequence(lastStart,i)); } return (CharSequence[]) segs.toArray(new CharSequence[segs.size()]); } /** * @param source * @return Recover log buffered reader. * @throws IOException */ public static BufferedReader getBufferedReader(File source) throws IOException { boolean isGzipped = source.getName().toLowerCase(). endsWith(GZIP_SUFFIX); FileInputStream fis = new FileInputStream(source); return new BufferedReader(isGzipped? new InputStreamReader(new GZIPInputStream(fis)): new InputStreamReader(fis)); } /** * Get a BufferedInputStream on the recovery file given. * * @param source file to open * @return Recover log buffered input stream. * @throws IOException */ public static BufferedInputStream getBufferedInput(File source) throws IOException { boolean isGzipped = source.getName().toLowerCase(). endsWith(GZIP_SUFFIX); FileInputStream fis = new FileInputStream(source); return isGzipped ? new BufferedInputStream(new GZIPInputStream(fis)) : new BufferedInputStream(fis); } /** * Flush and close the underlying IO objects. */ public void close() { if (this.out == null) { return; } try { this.out.flush(); this.out.close(); this.out = null; } catch (IOException e) { e.printStackTrace(); } } public void seriousError(String err) { writeLine("\n"+LOG_ERROR+ArchiveUtils.getLog14Date()+" "+err); } public synchronized void checkpoint(final File checkpointDir) throws IOException { if (this.out == null || !this.gzipFile.exists()) { return; } close(); // Rename gzipFile with the checkpoint name as suffix. this.gzipFile.renameTo(new File(this.gzipFile.getParentFile(), this.gzipFile.getName() + "." + checkpointDir.getName())); // Open new gzip file. this.out = initialize(this.gzipFile); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -