📄 pibenchmark.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.examples;import java.io.IOException;import java.util.Iterator;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.mapred.*;/** * A Map-reduce program to estimaate the valu eof Pi using monte-carlo * method. * * @author Milind Bhandarkar */public class PiBenchmark { /** * Mappper class for Pi estimation. */ public static class PiMapper extends MapReduceBase implements Mapper { /** Mapper configuration. * */ public void configure(JobConf job) { } static Random r = new Random(); long numInside = 0L; long numOutside = 0L; /** Map method. * @param key * @param value not-used. * @param out * @param reporter */ public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { long nSamples = ((LongWritable) key).get(); for(long idx = 0; idx < nSamples; idx++) { double x = r.nextDouble(); double y = r.nextDouble(); double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5); if (d > 0.25) { numOutside++; } else { numInside++; } if (idx%1000 == 1) { reporter.setStatus("Generated "+idx+" samples."); } } out.collect(new LongWritable(0), new LongWritable(numOutside)); out.collect(new LongWritable(1), new LongWritable(numInside)); } public void close() { // nothing } } public static class PiReducer extends MapReduceBase implements Reducer { long numInside = 0; long numOutside = 0; JobConf conf; /** Reducer configuration. * */ public void configure(JobConf job) { conf = job; } /** Reduce method. * @ param key * @param values * @param output * @param reporter */ public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { if (((LongWritable)key).get() == 1) { while (values.hasNext()) { long num = ((LongWritable)values.next()).get(); numInside += num; } } else { while (values.hasNext()) { long num = ((LongWritable)values.next()).get(); numOutside += num; } } } public void close() throws IOException { Path tmpDir = new Path("test-mini-mr"); Path outDir = new Path(tmpDir, "out"); Path outFile = new Path(outDir, "reduce-out"); FileSystem fileSys = FileSystem.get(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, LongWritable.class, LongWritable.class, CompressionType.NONE); writer.append(new LongWritable(numInside), new LongWritable(numOutside)); writer.close(); } } /** * This is the main driver for computing the value of Pi using * monte-carlo method. */ static double launch(int numMaps, long numPoints, String jt, String dfs) throws IOException { Configuration conf = new Configuration(); JobConf jobConf = new JobConf(conf, PiBenchmark.class); if (jt != null) { jobConf.set("mapred.job.tracker", jt); } if (dfs != null) { jobConf.set("fs.default.name", dfs); } jobConf.setJobName("test-mini-mr"); // turn off speculative execution, because DFS doesn't handle // multiple writers to the same file. jobConf.setSpeculativeExecution(false); jobConf.setInputFormat(SequenceFileInputFormat.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(LongWritable.class); jobConf.setOutputFormat(SequenceFileOutputFormat.class); jobConf.setMapperClass(PiMapper.class); jobConf.setReducerClass(PiReducer.class); jobConf.setNumReduceTasks(1); Path tmpDir = new Path("test-mini-mr"); Path inDir = new Path(tmpDir, "in"); Path outDir = new Path(tmpDir, "out"); FileSystem fileSys = FileSystem.get(jobConf); fileSys.delete(tmpDir); fileSys.mkdirs(inDir); jobConf.setInputPath(inDir); jobConf.setOutputPath(outDir); jobConf.setNumMapTasks(numMaps); for(int idx=0; idx < numMaps; ++idx) { Path file = new Path(inDir, "part"+idx); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, file, LongWritable.class, LongWritable.class, CompressionType.NONE); writer.append(new LongWritable(numPoints), new LongWritable(0)); writer.close(); System.out.println("Wrote input for Map #"+idx); } double estimate = 0.0; try { System.out.println("Starting Job"); long startTime = System.currentTimeMillis(); JobClient.runJob(jobConf); System.out.println("Job Finished in "+ (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds"); Path inFile = new Path(outDir, "reduce-out"); SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, jobConf); LongWritable numInside = new LongWritable(); LongWritable numOutside = new LongWritable(); reader.next(numInside, numOutside); reader.close(); estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints); } finally { fileSys.delete(tmpDir); } return estimate; } /** * Launches all the tasks in order. */ public static void main(String[] argv) throws Exception { if (argv.length < 2) { System.err.println("Usage: TestMiniMR <nMaps> <nSamples>"); return; } int nMaps = Integer.parseInt(argv[0]); long nSamples = Long.parseLong(argv[1]); System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples); System.out.println("Estimated value of PI is "+ launch(nMaps, nSamples, null, null)); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -