📄 testmapred.java
字号:
JobConf conf = new JobConf(); Path testdir = new Path("build/test/test.mapred.compress"); Path inDir = new Path(testdir, "in"); Path outDir = new Path(testdir, "out"); FileSystem fs = FileSystem.get(conf); fs.delete(testdir); conf.setInputPath(inDir); conf.setOutputPath(outDir); conf.setMapperClass(MyMap.class); conf.setReducerClass(MyReduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setOutputFormat(SequenceFileOutputFormat.class); if (includeCombine) { conf.setCombinerClass(IdentityReducer.class); } if (compressMapOutput) { conf.setCompressMapOutput(true); } if (compressReduceOutput) { SequenceFileOutputFormat.setCompressOutput(conf, true); } try { fs.mkdirs(testdir); fs.mkdirs(inDir); Path inFile = new Path(inDir, "part0"); DataOutputStream f = fs.create(inFile); f.writeBytes("Owen was here\n"); f.writeBytes("Hadoop is fun\n"); f.writeBytes("Is this done, yet?\n"); f.close(); JobClient.runJob(conf); Path output = new Path(outDir, ReduceTask.getOutputName(0)); assertTrue("reduce output exists " + output, fs.exists(output)); SequenceFile.Reader rdr = new SequenceFile.Reader(fs, output, conf); assertEquals("is reduce output compressed " + output, compressReduceOutput, rdr.isCompressed()); rdr.close(); } finally { fs.delete(testdir); } } public void testCompression() throws Exception { for(int compressMap=0; compressMap < 2; ++compressMap) { for(int compressOut=0; compressOut < 2; ++compressOut) { for(int combine=0; combine < 2; ++combine) { checkCompression(compressMap == 1, compressOut == 1, combine == 1); } } } } /** * */ public static void launch() throws Exception { // // Generate distribution of ints. This is the answer key. // JobConf conf = new JobConf(); int countsToGo = counts; int dist[] = new int[range]; for (int i = 0; i < range; i++) { double avgInts = (1.0 * countsToGo) / (range - i); dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian()))); countsToGo -= dist[i]; } if (countsToGo > 0) { dist[dist.length-1] += countsToGo; } // // Write the answer key to a file. // FileSystem fs = FileSystem.get(conf); Path testdir = new Path("mapred.loadtest"); fs.mkdirs(testdir); Path randomIns = new Path(testdir, "genins"); fs.mkdirs(randomIns); Path answerkey = new Path(randomIns, "answer.key"); SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class, IntWritable.class, SequenceFile.CompressionType.NONE); try { for (int i = 0; i < range; i++) { out.append(new IntWritable(i), new IntWritable(dist[i])); } } finally { out.close(); } // // Now we need to generate the random numbers according to // the above distribution. // // We create a lot of map tasks, each of which takes at least // one "line" of the distribution. (That is, a certain number // X is to be generated Y number of times.) // // A map task emits Y key/val pairs. The val is X. The key // is a randomly-generated number. // // The reduce task gets its input sorted by key. That is, sorted // in random order. It then emits a single line of text that // for the given values. It does not emit the key. // // Because there's just one reduce task, we emit a single big // file of random numbers. // Path randomOuts = new Path(testdir, "genouts"); fs.delete(randomOuts); JobConf genJob = new JobConf(conf); genJob.setInputPath(randomIns); genJob.setInputFormat(SequenceFileInputFormat.class); genJob.setMapperClass(RandomGenMapper.class); genJob.setOutputPath(randomOuts); genJob.setOutputKeyClass(IntWritable.class); genJob.setOutputValueClass(IntWritable.class); genJob.setOutputFormat(TextOutputFormat.class); genJob.setReducerClass(RandomGenReducer.class); genJob.setNumReduceTasks(1); JobClient.runJob(genJob); // // Next, we read the big file in and regenerate the // original map. It's split into a number of parts. // (That number is 'intermediateReduces'.) // // We have many map tasks, each of which read at least one // of the output numbers. For each number read in, the // map task emits a key/value pair where the key is the // number and the value is "1". // // We have a single reduce task, which receives its input // sorted by the key emitted above. For each key, there will // be a certain number of "1" values. The reduce task sums // these values to compute how many times the given key was // emitted. // // The reduce task then emits a key/val pair where the key // is the number in question, and the value is the number of // times the key was emitted. This is the same format as the // original answer key (except that numbers emitted zero times // will not appear in the regenerated key.) The answer set // is split into a number of pieces. A final MapReduce job // will merge them. // // There's not really a need to go to 10 reduces here // instead of 1. But we want to test what happens when // you have multiple reduces at once. // int intermediateReduces = 10; Path intermediateOuts = new Path(testdir, "intermediateouts"); fs.delete(intermediateOuts); JobConf checkJob = new JobConf(conf); checkJob.setInputPath(randomOuts); checkJob.setInputFormat(TextInputFormat.class); checkJob.setMapperClass(RandomCheckMapper.class); checkJob.setOutputPath(intermediateOuts); checkJob.setOutputKeyClass(IntWritable.class); checkJob.setOutputValueClass(IntWritable.class); checkJob.setOutputFormat(SequenceFileOutputFormat.class); checkJob.setReducerClass(RandomCheckReducer.class); checkJob.setNumReduceTasks(intermediateReduces); JobClient.runJob(checkJob); // // OK, now we take the output from the last job and // merge it down to a single file. The map() and reduce() // functions don't really do anything except reemit tuples. // But by having a single reduce task here, we end up merging // all the files. // Path finalOuts = new Path(testdir, "finalouts"); fs.delete(finalOuts); JobConf mergeJob = new JobConf(conf); mergeJob.setInputPath(intermediateOuts); mergeJob.setInputFormat(SequenceFileInputFormat.class); mergeJob.setMapperClass(MergeMapper.class); mergeJob.setOutputPath(finalOuts); mergeJob.setOutputKeyClass(IntWritable.class); mergeJob.setOutputValueClass(IntWritable.class); mergeJob.setOutputFormat(SequenceFileOutputFormat.class); mergeJob.setReducerClass(MergeReducer.class); mergeJob.setNumReduceTasks(1); JobClient.runJob(mergeJob); // // Finally, we compare the reconstructed answer key with the // original one. Remember, we need to ignore zero-count items // in the original key. // boolean success = true; Path recomputedkey = new Path(finalOuts, "part-00000"); SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf); int totalseen = 0; try { IntWritable key = new IntWritable(); IntWritable val = new IntWritable(); for (int i = 0; i < range; i++) { if (dist[i] == 0) { continue; } if (! in.next(key, val)) { System.err.println("Cannot read entry " + i); success = false; break; } else { if ( !((key.get() == i ) && (val.get() == dist[i]))) { System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]); success = false; } totalseen += val.get(); } } if (success) { if (in.next(key, val)) { System.err.println("Unnecessary lines in recomputed key!"); success = false; } } } finally { in.close(); } int originalTotal = 0; for (int i = 0; i < dist.length; i++) { originalTotal += dist[i]; } System.out.println("Original sum: " + originalTotal); System.out.println("Recomputed sum: " + totalseen); // // Write to "results" whether the test succeeded or not. // Path resultFile = new Path(testdir, "results"); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile))); try { bw.write("Success=" + success + "\n"); System.out.println("Success=" + success); } finally { bw.close(); } fs.delete(testdir); } /** * Launches all the tasks in order. */ public static void main(String[] argv) throws Exception { if (argv.length < 2) { System.err.println("Usage: TestMapRed <range> <counts>"); System.err.println(); System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>"); return; } int i = 0; range = Integer.parseInt(argv[i++]); counts = Integer.parseInt(argv[i++]); launch(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -