⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testmapred.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      JobConf conf = new JobConf();      Path testdir = new Path("build/test/test.mapred.compress");      Path inDir = new Path(testdir, "in");      Path outDir = new Path(testdir, "out");      FileSystem fs = FileSystem.get(conf);      fs.delete(testdir);      conf.setInputPath(inDir);      conf.setOutputPath(outDir);      conf.setMapperClass(MyMap.class);      conf.setReducerClass(MyReduce.class);      conf.setOutputKeyClass(Text.class);      conf.setOutputValueClass(Text.class);      conf.setOutputFormat(SequenceFileOutputFormat.class);      if (includeCombine) {        conf.setCombinerClass(IdentityReducer.class);      }      if (compressMapOutput) {        conf.setCompressMapOutput(true);      }      if (compressReduceOutput) {        SequenceFileOutputFormat.setCompressOutput(conf, true);      }      try {        fs.mkdirs(testdir);        fs.mkdirs(inDir);        Path inFile = new Path(inDir, "part0");        DataOutputStream f = fs.create(inFile);        f.writeBytes("Owen was here\n");        f.writeBytes("Hadoop is fun\n");        f.writeBytes("Is this done, yet?\n");        f.close();        JobClient.runJob(conf);        Path output = new Path(outDir,                               ReduceTask.getOutputName(0));        assertTrue("reduce output exists " + output, fs.exists(output));        SequenceFile.Reader rdr =             new SequenceFile.Reader(fs, output, conf);        assertEquals("is reduce output compressed " + output,                      compressReduceOutput,                      rdr.isCompressed());        rdr.close();      } finally {        fs.delete(testdir);      }    }        public void testCompression() throws Exception {      for(int compressMap=0; compressMap < 2; ++compressMap) {        for(int compressOut=0; compressOut < 2; ++compressOut) {          for(int combine=0; combine < 2; ++combine) {            checkCompression(compressMap == 1, compressOut == 1,                             combine == 1);          }        }      }    }            /**     *      */    public static void launch() throws Exception {        //        // Generate distribution of ints.  This is the answer key.        //        JobConf conf = new JobConf();        int countsToGo = counts;        int dist[] = new int[range];        for (int i = 0; i < range; i++) {            double avgInts = (1.0 * countsToGo) / (range - i);            dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));            countsToGo -= dist[i];        }        if (countsToGo > 0) {            dist[dist.length-1] += countsToGo;        }        //        // Write the answer key to a file.          //        FileSystem fs = FileSystem.get(conf);        Path testdir = new Path("mapred.loadtest");        fs.mkdirs(testdir);        Path randomIns = new Path(testdir, "genins");        fs.mkdirs(randomIns);        Path answerkey = new Path(randomIns, "answer.key");        SequenceFile.Writer out =           SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,                                    IntWritable.class,                                     SequenceFile.CompressionType.NONE);        try {            for (int i = 0; i < range; i++) {                out.append(new IntWritable(i), new IntWritable(dist[i]));            }        } finally {            out.close();        }        //        // Now we need to generate the random numbers according to        // the above distribution.        //        // We create a lot of map tasks, each of which takes at least        // one "line" of the distribution.  (That is, a certain number        // X is to be generated Y number of times.)        //        // A map task emits Y key/val pairs.  The val is X.  The key        // is a randomly-generated number.        //        // The reduce task gets its input sorted by key.  That is, sorted        // in random order.  It then emits a single line of text that        // for the given values.  It does not emit the key.        //        // Because there's just one reduce task, we emit a single big        // file of random numbers.        //        Path randomOuts = new Path(testdir, "genouts");        fs.delete(randomOuts);        JobConf genJob = new JobConf(conf);        genJob.setInputPath(randomIns);        genJob.setInputFormat(SequenceFileInputFormat.class);        genJob.setMapperClass(RandomGenMapper.class);        genJob.setOutputPath(randomOuts);        genJob.setOutputKeyClass(IntWritable.class);        genJob.setOutputValueClass(IntWritable.class);        genJob.setOutputFormat(TextOutputFormat.class);        genJob.setReducerClass(RandomGenReducer.class);        genJob.setNumReduceTasks(1);        JobClient.runJob(genJob);        //        // Next, we read the big file in and regenerate the         // original map.  It's split into a number of parts.        // (That number is 'intermediateReduces'.)        //        // We have many map tasks, each of which read at least one        // of the output numbers.  For each number read in, the        // map task emits a key/value pair where the key is the        // number and the value is "1".        //        // We have a single reduce task, which receives its input        // sorted by the key emitted above.  For each key, there will        // be a certain number of "1" values.  The reduce task sums        // these values to compute how many times the given key was        // emitted.        //        // The reduce task then emits a key/val pair where the key        // is the number in question, and the value is the number of        // times the key was emitted.  This is the same format as the        // original answer key (except that numbers emitted zero times        // will not appear in the regenerated key.)  The answer set        // is split into a number of pieces.  A final MapReduce job        // will merge them.        //        // There's not really a need to go to 10 reduces here         // instead of 1.  But we want to test what happens when        // you have multiple reduces at once.        //        int intermediateReduces = 10;        Path intermediateOuts = new Path(testdir, "intermediateouts");        fs.delete(intermediateOuts);        JobConf checkJob = new JobConf(conf);        checkJob.setInputPath(randomOuts);        checkJob.setInputFormat(TextInputFormat.class);        checkJob.setMapperClass(RandomCheckMapper.class);        checkJob.setOutputPath(intermediateOuts);        checkJob.setOutputKeyClass(IntWritable.class);        checkJob.setOutputValueClass(IntWritable.class);        checkJob.setOutputFormat(SequenceFileOutputFormat.class);        checkJob.setReducerClass(RandomCheckReducer.class);        checkJob.setNumReduceTasks(intermediateReduces);        JobClient.runJob(checkJob);        //        // OK, now we take the output from the last job and        // merge it down to a single file.  The map() and reduce()        // functions don't really do anything except reemit tuples.        // But by having a single reduce task here, we end up merging        // all the files.        //        Path finalOuts = new Path(testdir, "finalouts");                fs.delete(finalOuts);        JobConf mergeJob = new JobConf(conf);        mergeJob.setInputPath(intermediateOuts);        mergeJob.setInputFormat(SequenceFileInputFormat.class);        mergeJob.setMapperClass(MergeMapper.class);                mergeJob.setOutputPath(finalOuts);        mergeJob.setOutputKeyClass(IntWritable.class);        mergeJob.setOutputValueClass(IntWritable.class);        mergeJob.setOutputFormat(SequenceFileOutputFormat.class);        mergeJob.setReducerClass(MergeReducer.class);        mergeJob.setNumReduceTasks(1);                JobClient.runJob(mergeJob);                 //        // Finally, we compare the reconstructed answer key with the        // original one.  Remember, we need to ignore zero-count items        // in the original key.        //        boolean success = true;        Path recomputedkey = new Path(finalOuts, "part-00000");        SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);        int totalseen = 0;        try {            IntWritable key = new IntWritable();            IntWritable val = new IntWritable();                        for (int i = 0; i < range; i++) {                if (dist[i] == 0) {                    continue;                }                if (! in.next(key, val)) {                    System.err.println("Cannot read entry " + i);                    success = false;                    break;                } else {                    if ( !((key.get() == i ) && (val.get() == dist[i]))) {                        System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);                        success = false;                    }                    totalseen += val.get();                }            }            if (success) {                if (in.next(key, val)) {                    System.err.println("Unnecessary lines in recomputed key!");                    success = false;                }            }        } finally {            in.close();        }        int originalTotal = 0;        for (int i = 0; i < dist.length; i++) {            originalTotal += dist[i];        }        System.out.println("Original sum: " + originalTotal);        System.out.println("Recomputed sum: " + totalseen);        //        // Write to "results" whether the test succeeded or not.        //        Path resultFile = new Path(testdir, "results");        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));        try {            bw.write("Success=" + success + "\n");            System.out.println("Success=" + success);                    } finally {            bw.close();        }	fs.delete(testdir);    }    /**     * Launches all the tasks in order.     */    public static void main(String[] argv) throws Exception {        if (argv.length < 2) {            System.err.println("Usage: TestMapRed <range> <counts>");            System.err.println();            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");            return;        }        int i = 0;        range = Integer.parseInt(argv[i++]);        counts = Integer.parseInt(argv[i++]);	      launch();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -