⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapredloadtest.java

📁 Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。在 Hadoop中实现了Google的MapReduce算法
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        int countsToGo = counts;        int dist[] = new int[range];        for (int i = 0; i < range; i++) {            double avgInts = (1.0 * countsToGo) / (range - i);            dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));            countsToGo -= dist[i];        }        if (countsToGo > 0) {            dist[dist.length-1] += countsToGo;        }        //        // Write the answer key to a file.          //        FileSystem fs = FileSystem.get(conf);        File testdir = new File("mapred.loadtest");        fs.mkdirs(testdir);        File randomIns = new File(testdir, "genins");        fs.mkdirs(randomIns);        File answerkey = new File(randomIns, "answer.key");        SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), IntWritable.class, IntWritable.class);        try {            for (int i = 0; i < range; i++) {                out.append(new IntWritable(i), new IntWritable(dist[i]));            }        } finally {            out.close();        }        //        // Now we need to generate the random numbers according to        // the above distribution.        //        // We create a lot of map tasks, each of which takes at least        // one "line" of the distribution.  (That is, a certain number        // X is to be generated Y number of times.)        //        // A map task emits Y key/val pairs.  The val is X.  The key        // is a randomly-generated number.        //        // The reduce task gets its input sorted by key.  That is, sorted        // in random order.  It then emits a single line of text that        // for the given values.  It does not emit the key.        //        // Because there's just one reduce task, we emit a single big        // file of random numbers.        //        File randomOuts = new File(testdir, "genouts");        fs.mkdirs(randomOuts);        JobConf genJob = new JobConf(conf);        genJob.setInputDir(randomIns);        genJob.setInputKeyClass(IntWritable.class);        genJob.setInputValueClass(IntWritable.class);        genJob.setInputFormat(SequenceFileInputFormat.class);        genJob.setMapperClass(RandomGenMapper.class);        genJob.setOutputDir(randomOuts);        genJob.setOutputKeyClass(IntWritable.class);        genJob.setOutputValueClass(IntWritable.class);        genJob.setOutputFormat(TextOutputFormat.class);        genJob.setReducerClass(RandomGenReducer.class);        genJob.setNumReduceTasks(1);        JobClient.runJob(genJob);        //        // Next, we read the big file in and regenerate the         // original map.  It's split into a number of parts.        // (That number is 'intermediateReduces'.)        //        // We have many map tasks, each of which read at least one        // of the output numbers.  For each number read in, the        // map task emits a key/value pair where the key is the        // number and the value is "1".        //        // We have a single reduce task, which receives its input        // sorted by the key emitted above.  For each key, there will        // be a certain number of "1" values.  The reduce task sums        // these values to compute how many times the given key was        // emitted.        //        // The reduce task then emits a key/val pair where the key        // is the number in question, and the value is the number of        // times the key was emitted.  This is the same format as the        // original answer key (except that numbers emitted zero times        // will not appear in the regenerated key.)  The answer set        // is split into a number of pieces.  A final MapReduce job        // will merge them.        //        // There's not really a need to go to 10 reduces here         // instead of 1.  But we want to test what happens when        // you have multiple reduces at once.        //        int intermediateReduces = 10;        File intermediateOuts = new File(testdir, "intermediateouts");        fs.mkdirs(intermediateOuts);        JobConf checkJob = new JobConf(conf);        checkJob.setInputDir(randomOuts);        checkJob.setInputKeyClass(LongWritable.class);        checkJob.setInputValueClass(UTF8.class);        checkJob.setInputFormat(TextInputFormat.class);        checkJob.setMapperClass(RandomCheckMapper.class);        checkJob.setOutputDir(intermediateOuts);        checkJob.setOutputKeyClass(IntWritable.class);        checkJob.setOutputValueClass(IntWritable.class);        checkJob.setOutputFormat(SequenceFileOutputFormat.class);        checkJob.setReducerClass(RandomCheckReducer.class);        checkJob.setNumReduceTasks(intermediateReduces);        JobClient.runJob(checkJob);        //        // OK, now we take the output from the last job and        // merge it down to a single file.  The map() and reduce()        // functions don't really do anything except reemit tuples.        // But by having a single reduce task here, we end up merging        // all the files.        //        File finalOuts = new File(testdir, "finalouts");                fs.mkdirs(finalOuts);        JobConf mergeJob = new JobConf(conf);        mergeJob.setInputDir(intermediateOuts);        mergeJob.setInputKeyClass(IntWritable.class);        mergeJob.setInputValueClass(IntWritable.class);        mergeJob.setInputFormat(SequenceFileInputFormat.class);        mergeJob.setMapperClass(MergeMapper.class);                mergeJob.setOutputDir(finalOuts);        mergeJob.setOutputKeyClass(IntWritable.class);        mergeJob.setOutputValueClass(IntWritable.class);        mergeJob.setOutputFormat(SequenceFileOutputFormat.class);        mergeJob.setReducerClass(MergeReducer.class);        mergeJob.setNumReduceTasks(1);                JobClient.runJob(mergeJob);                 //        // Finally, we compare the reconstructed answer key with the        // original one.  Remember, we need to ignore zero-count items        // in the original key.        //        boolean success = true;        File recomputedkey = new File(finalOuts, "part-00000");        SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey.getPath(), conf);        int totalseen = 0;        try {            IntWritable key = new IntWritable();            IntWritable val = new IntWritable();                        for (int i = 0; i < range; i++) {                if (dist[i] == 0) {                    continue;                }                if (! in.next(key, val)) {                    System.err.println("Cannot read entry " + i);                    success = false;                    break;                } else {                    if ( !((key.get() == i ) && (val.get() == dist[i]))) {                        System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);                        success = false;                    }                    totalseen += val.get();                }            }            if (success) {                if (in.next(key, val)) {                    System.err.println("Unnecessary lines in recomputed key!");                    success = false;                }            }        } finally {            in.close();        }        int originalTotal = 0;        for (int i = 0; i < dist.length; i++) {            originalTotal += dist[i];        }        System.out.println("Original sum: " + originalTotal);        System.out.println("Recomputed sum: " + totalseen);        //        // Write to "results" whether the test succeeded or not.        //        File resultFile = new File(testdir, "results");        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));        try {            bw.write("Success=" + success + "\n");            System.out.println("Success=" + success);                    } finally {            bw.close();        }    }    /**     * Launches all the tasks in order.     */    public static void main(String[] argv) throws Exception {        if (argv.length < 2) {            System.err.println("Usage: MapredLoadTest <range> <counts>");            System.err.println();            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");            return;        }        int i = 0;        int range = Integer.parseInt(argv[i++]);        int counts = Integer.parseInt(argv[i++]);        MapredLoadTest mlt = new MapredLoadTest(range, counts, new Configuration());        mlt.launch();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -