⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapredloadtest.java

📁 Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。在 Hadoop中实现了Google的MapReduce算法
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.conf.*;import java.io.*;import java.util.*;/********************************************************** * MapredLoadTest generates a bunch of work that exercises * a Hadoop Map-Reduce system (and DFS, too).  It goes through * the following steps: * * 1) Take inputs 'range' and 'counts'. * 2) Generate 'counts' random integers between 0 and range-1. * 3) Create a file that lists each integer between 0 and range-1, *    and lists the number of times that integer was generated. * 4) Emit a (very large) file that contains all the integers *    in the order generated. * 5) After the file has been generated, read it back and count *    how many times each int was generated. * 6) Compare this big count-map against the original one.  If *    they match, then SUCCESS!  Otherwise, FAILURE! * * OK, that's how we can think about it.  What are the map-reduce * steps that get the job done? * * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. * 2) In a non-mapread thread, generate the answer-key and write to disk. * 3) In a mapred job, divide the answer key into K jobs. * 4) A mapred 'generator' task consists of K map jobs.  Each reads *    an individual "sub-key", and generates integers according to *    to it (though with a random ordering). * 5) The generator's reduce task agglomerates all of those files *    into a single one. * 6) A mapred 'reader' task consists of M map jobs.  The output *    file is cut into M pieces. Each of the M jobs counts the  *    individual ints in its chunk and creates a map of all seen ints. * 7) A mapred job integrates all the count files into a single one. * **********************************************************/public class MapredLoadTest {    /**     * The RandomGen Job does the actual work of creating     * a huge file of assorted numbers.  It receives instructions     * as to how many times each number should be counted.  Then     * it emits those numbers in a crazy order.     *     * The map() function takes a key/val pair that describes     * a value-to-be-emitted (the key) and how many times it      * should be emitted (the value), aka "numtimes".  map() then     * emits a series of intermediate key/val pairs.  It emits     * 'numtimes' of these.  The key is a random number and the     * value is the 'value-to-be-emitted'.     *     * The system collates and merges these pairs according to     * the random number.  reduce() function takes in a key/value     * pair that consists of a crazy random number and a series     * of values that should be emitted.  The random number key     * is now dropped, and reduce() emits a pair for every intermediate value.     * The emitted key is an intermediate value.  The emitted value     * is just a blank string.  Thus, we've created a huge file     * of numbers in random order, but where each number appears     * as many times as we were instructed.     */    static class RandomGenMapper implements Mapper {        Random r = new Random();        public void configure(JobConf job) {        }        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {            int randomVal = ((IntWritable) key).get();            int randomCount = ((IntWritable) val).get();            for (int i = 0; i < randomCount; i++) {                out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));            }        }        public void close() {        }    }    /**     */    static class RandomGenReducer implements Reducer {        public void configure(JobConf job) {        }        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {            int keyint = ((IntWritable) key).get();            while (it.hasNext()) {                int val = ((IntWritable) it.next()).get();                out.collect(new UTF8("" + val), new UTF8(""));            }        }        public void close() {        }    }    /**     * The RandomCheck Job does a lot of our work.  It takes     * in a num/string keyspace, and transforms it into a     * key/count(int) keyspace.     *     * The map() function just emits a num/1 pair for every     * num/string input pair.     *     * The reduce() function sums up all the 1s that were     * emitted for a single key.  It then emits the key/total     * pair.     *     * This is used to regenerate the random number "answer key".     * Each key here is a random number, and the count is the     * number of times the number was emitted.     */    static class RandomCheckMapper implements Mapper {        public void configure(JobConf job) {        }        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {            long pos = ((LongWritable) key).get();            UTF8 str = (UTF8) val;            out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));        }        public void close() {        }    }    /**     */    static class RandomCheckReducer implements Reducer {        public void configure(JobConf job) {        }                public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {            int keyint = ((IntWritable) key).get();            int count = 0;            while (it.hasNext()) {                it.next();                count++;            }            out.collect(new IntWritable(keyint), new IntWritable(count));        }        public void close() {        }    }    /**     * The Merge Job is a really simple one.  It takes in     * an int/int key-value set, and emits the same set.     * But it merges identical keys by adding their values.     *     * Thus, the map() function is just the identity function     * and reduce() just sums.  Nothing to see here!     */    static class MergeMapper implements Mapper {        public void configure(JobConf job) {        }        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {            int keyint = ((IntWritable) key).get();            int valint = ((IntWritable) val).get();            out.collect(new IntWritable(keyint), new IntWritable(valint));        }        public void close() {        }    }    static class MergeReducer implements Reducer {        public void configure(JobConf job) {        }                public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {            int keyint = ((IntWritable) key).get();            int total = 0;            while (it.hasNext()) {                total += ((IntWritable) it.next()).get();            }            out.collect(new IntWritable(keyint), new IntWritable(total));        }        public void close() {        }    }    int range;    int counts;    Random r = new Random();    Configuration conf;    /**     * MapredLoadTest     */    public MapredLoadTest(int range, int counts, Configuration conf) throws IOException {        this.range = range;        this.counts = counts;        this.conf = conf;    }    /**     *      */    public void launch() throws IOException {        //        // Generate distribution of ints.  This is the answer key.        //

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -