📄 testmapred.java
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.lib.*;import junit.framework.TestCase;import java.io.*;import java.util.*;/********************************************************** * MapredLoadTest generates a bunch of work that exercises * a Hadoop Map-Reduce system (and DFS, too). It goes through * the following steps: * * 1) Take inputs 'range' and 'counts'. * 2) Generate 'counts' random integers between 0 and range-1. * 3) Create a file that lists each integer between 0 and range-1, * and lists the number of times that integer was generated. * 4) Emit a (very large) file that contains all the integers * in the order generated. * 5) After the file has been generated, read it back and count * how many times each int was generated. * 6) Compare this big count-map against the original one. If * they match, then SUCCESS! Otherwise, FAILURE! * * OK, that's how we can think about it. What are the map-reduce * steps that get the job done? * * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. * 2) In a non-mapread thread, generate the answer-key and write to disk. * 3) In a mapred job, divide the answer key into K jobs. * 4) A mapred 'generator' task consists of K map jobs. Each reads * an individual "sub-key", and generates integers according to * to it (though with a random ordering). * 5) The generator's reduce task agglomerates all of those files * into a single one. * 6) A mapred 'reader' task consists of M map jobs. The output * file is cut into M pieces. Each of the M jobs counts the * individual ints in its chunk and creates a map of all seen ints. * 7) A mapred job integrates all the count files into a single one. * **********************************************************/public class TestMapRed extends TestCase { /** * Modified to make it a junit test. * The RandomGen Job does the actual work of creating * a huge file of assorted numbers. It receives instructions * as to how many times each number should be counted. Then * it emits those numbers in a crazy order. * * The map() function takes a key/val pair that describes * a value-to-be-emitted (the key) and how many times it * should be emitted (the value), aka "numtimes". map() then * emits a series of intermediate key/val pairs. It emits * 'numtimes' of these. The key is a random number and the * value is the 'value-to-be-emitted'. * * The system collates and merges these pairs according to * the random number. reduce() function takes in a key/value * pair that consists of a crazy random number and a series * of values that should be emitted. The random number key * is now dropped, and reduce() emits a pair for every intermediate value. * The emitted key is an intermediate value. The emitted value * is just a blank string. Thus, we've created a huge file * of numbers in random order, but where each number appears * as many times as we were instructed. */ static class RandomGenMapper implements Mapper { public void configure(JobConf job) { } public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { int randomVal = ((IntWritable) key).get(); int randomCount = ((IntWritable) val).get(); for (int i = 0; i < randomCount; i++) { out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal)); } } public void close() { } } /** */ static class RandomGenReducer implements Reducer { public void configure(JobConf job) { } public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { while (it.hasNext()) { int val = ((IntWritable) it.next()).get(); out.collect(new Text("" + val), new Text("")); } } public void close() { } } /** * The RandomCheck Job does a lot of our work. It takes * in a num/string keyspace, and transforms it into a * key/count(int) keyspace. * * The map() function just emits a num/1 pair for every * num/string input pair. * * The reduce() function sums up all the 1s that were * emitted for a single key. It then emits the key/total * pair. * * This is used to regenerate the random number "answer key". * Each key here is a random number, and the count is the * number of times the number was emitted. */ static class RandomCheckMapper implements Mapper { public void configure(JobConf job) { } public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { Text str = (Text) val; out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1)); } public void close() { } } /** */ static class RandomCheckReducer implements Reducer { public void configure(JobConf job) { } public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { int keyint = ((IntWritable) key).get(); int count = 0; while (it.hasNext()) { it.next(); count++; } out.collect(new IntWritable(keyint), new IntWritable(count)); } public void close() { } } /** * The Merge Job is a really simple one. It takes in * an int/int key-value set, and emits the same set. * But it merges identical keys by adding their values. * * Thus, the map() function is just the identity function * and reduce() just sums. Nothing to see here! */ static class MergeMapper implements Mapper { public void configure(JobConf job) { } public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { int keyint = ((IntWritable) key).get(); int valint = ((IntWritable) val).get(); out.collect(new IntWritable(keyint), new IntWritable(valint)); } public void close() { } } static class MergeReducer implements Reducer { public void configure(JobConf job) { } public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { int keyint = ((IntWritable) key).get(); int total = 0; while (it.hasNext()) { total += ((IntWritable) it.next()).get(); } out.collect(new IntWritable(keyint), new IntWritable(total)); } public void close() { } } private static int range = 10; private static int counts = 100; private static Random r = new Random(); /** public TestMapRed(int range, int counts, Configuration conf) throws IOException { this.range = range; this.counts = counts; this.conf = conf; } **/ public void testMapred() throws Exception { launch(); } private static class MyMap implements Mapper { private JobConf conf; private boolean compress; private String taskId; public void configure(JobConf conf) { this.conf = conf; compress = conf.getBoolean("mapred.compress.map.output", false); taskId = conf.get("mapred.task.id"); } public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter ) throws IOException { String str = ((Text) value).toString().toLowerCase(); output.collect(new Text(str), value); } public void close() throws IOException { MapOutputFile namer = new MapOutputFile(); namer.setConf(conf); FileSystem fs = FileSystem.get(conf); Path output = namer.getOutputFile(taskId, 0); assertTrue("map output exists " + output, fs.exists(output)); SequenceFile.Reader rdr = new SequenceFile.Reader(fs, output, conf); assertEquals("is map output compressed " + output, compress, rdr.isCompressed()); rdr.close(); } } private static class MyReduce extends IdentityReducer { private JobConf conf; private boolean compressInput; private String taskId; private boolean first = true; public void configure(JobConf conf) { this.conf = conf; compressInput = conf.getBoolean("mapred.compress.map.output", false); taskId = conf.get("mapred.task.id"); } public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter ) throws IOException { if (first) { first = false; Path input = conf.getLocalPath(taskId+"/all.2"); FileSystem fs = FileSystem.get(conf); assertTrue("reduce input exists " + input, fs.exists(input)); SequenceFile.Reader rdr = new SequenceFile.Reader(fs, input, conf); assertEquals("is reduce input compressed " + input, compressInput, rdr.isCompressed()); rdr.close(); } } } private void checkCompression(boolean compressMapOutput, boolean compressReduceOutput, boolean includeCombine ) throws Exception {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -