📄 crawldbreader.java
字号:
public void processStatJob(String crawlDb, Configuration config) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics start: " + crawlDb); } Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis()); JobConf job = new NutchJob(config); job.setJobName("stats " + crawlDb); job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(CrawlDatum.class); job.setMapperClass(CrawlDbStatMapper.class); job.setCombinerClass(CrawlDbStatCombiner.class); job.setReducerClass(CrawlDbStatReducer.class); job.setOutputPath(tmpFolder); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(LongWritable.class); JobClient.runJob(job); // reading the result FileSystem fileSystem = FileSystem.get(config); SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, tmpFolder); UTF8 key = new UTF8(); LongWritable value = new LongWritable(); TreeMap stats = new TreeMap(); for (int i = 0; i < readers.length; i++) { SequenceFile.Reader reader = readers[i]; while (reader.next(key, value)) { String k = key.toString(); LongWritable val = (LongWritable) stats.get(k); if (val == null) { val = new LongWritable(); if (k.equals("scx")) val.set(Long.MIN_VALUE); if (k.equals("scn")) val.set(Long.MAX_VALUE); stats.put(k, val); } if (k.equals("scx")) { if (val.get() < value.get()) val.set(value.get()); } else if (k.equals("scn")) { if (val.get() > value.get()) val.set(value.get()); } else { val.set(val.get() + value.get()); } } } if (LOG.isInfoEnabled()) { LOG.info("Statistics for CrawlDb: " + crawlDb); LongWritable totalCnt = (LongWritable)stats.get("T"); stats.remove("T"); LOG.info("TOTAL urls:\t" + totalCnt.get()); Iterator it = stats.keySet().iterator(); while (it.hasNext()) { String k = (String) it.next(); LongWritable val = (LongWritable) stats.get(k); if (k.equals("scn")) { LOG.info("min score:\t" + (float) (val.get() / 1000.0f)); } else if (k.equals("scx")) { LOG.info("max score:\t" + (float) (val.get() / 1000.0f)); } else if (k.equals("sct")) { LOG.info("avg score:\t" + (float) ((float) (val.get() / (float)totalCnt.get()) / 1000.0f)); } else if (k.startsWith("status")) { int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1)); LOG.info(k + " (" + CrawlDatum.statNames[code] + "):\t" + val); } else LOG.info(k + ":\t" + val); } } // removing the tmp folder fileSystem.delete(tmpFolder); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics: done"); } } public CrawlDatum get(String crawlDb, String url, Configuration config) throws IOException { UTF8 key = new UTF8(url); CrawlDatum val = new CrawlDatum(); openReaders(crawlDb, config); CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, new HashPartitioner(), key, val); return res; } public void readUrl(String crawlDb, String url, Configuration config) throws IOException { CrawlDatum res = get(crawlDb, url, config); System.out.println("URL: " + url); if (res != null) { System.out.println(res); } else { System.out.println("not found"); } } public void processDumpJob(String crawlDb, String output, Configuration config) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: starting"); LOG.info("CrawlDb db: " + crawlDb); } Path outFolder = new Path(output); JobConf job = new NutchJob(config); job.setJobName("dump " + crawlDb); job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(CrawlDatum.class); job.setOutputPath(outFolder); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(CrawlDatum.class); JobClient.runJob(job); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); } } public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); LOG.info("CrawlDb db: " + crawlDb); } Path outFolder = new Path(output); Path tempDir = new Path(config.get("mapred.temp.dir", ".") + "/readdb-topN-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf job = new NutchJob(config); job.setJobName("topN prepare " + crawlDb); job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(CrawlDatum.class); job.setMapperClass(CrawlDbTopNMapper.class); job.setReducerClass(IdentityReducer.class); job.setOutputPath(tempDir); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputValueClass(UTF8.class); // XXX hmmm, no setFloat() in the API ... :( job.setLong("CrawlDbReader.topN.min", Math.round(1000000.0 * min)); JobClient.runJob(job); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: collecting topN scores."); } job = new NutchJob(config); job.setJobName("topN collect " + crawlDb); job.setLong("CrawlDbReader.topN", topN); job.addInputPath(tempDir); job.setInputFormat(SequenceFileInputFormat.class); job.setInputKeyClass(FloatWritable.class); job.setInputValueClass(UTF8.class); job.setMapperClass(IdentityMapper.class); job.setReducerClass(CrawlDbTopNReducer.class); job.setOutputPath(outFolder); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputValueClass(UTF8.class); // XXX *sigh* this apparently doesn't work ... :-(( job.setNumReduceTasks(1); // create a single file. JobClient.runJob(job); FileSystem fs = FileSystem.get(config); fs.delete(tempDir); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); } } public static void main(String[] args) throws IOException { CrawlDbReader dbr = new CrawlDbReader(); if (args.length < 1) { System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); System.err.println("\t<crawldb>\tdirectory name where crawldb is located"); System.err.println("\t-stats\tprint overall statistics to System.out"); System.err.println("\t-dump <out_dir>\tdump the whole db to a text file in <out_dir>"); System.err.println("\t-url <url>\tprint information on <url> to System.out"); System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); System.err.println("\t\t[<min>]\tskip records with scores below this value."); System.err.println("\t\t\tThis can significantly improve performance."); return; } String param = null; String crawlDb = args[0]; Configuration conf = NutchConfiguration.create(); for (int i = 1; i < args.length; i++) { if (args[i].equals("-stats")) { dbr.processStatJob(crawlDb, conf); } else if (args[i].equals("-dump")) { param = args[++i]; dbr.processDumpJob(crawlDb, param, conf); } else if (args[i].equals("-url")) { param = args[++i]; dbr.readUrl(crawlDb, param, conf); } else if (args[i].equals("-topN")) { param = args[++i]; long topN = Long.parseLong(param); param = args[++i]; float min = 0.0f; if (i < args.length - 1) { min = Float.parseFloat(args[++i]); } dbr.processTopNJob(crawlDb, topN, min, param, conf); } else { System.err.println("\nError: wrong argument " + args[i]); } } return; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -