⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nnbench.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.examples;import java.io.IOException;import java.util.Date;import java.util.Iterator;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.mapred.ClusterStatus;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileInputFormat;import org.apache.hadoop.util.Progressable;/** * This program uses map/reduce to just run a distributed job where there is * no interaction between the tasks and each task creates 1M/NTasks files * of 8 bytes each, closes them. Opens those files again, and reads them, * and closes them. It is meant as a stress-test and benchmark for namenode. *  * @author Owen O'Malley */public class NNBench extends MapReduceBase implements Reducer {    public static class Map extends MapReduceBase implements Mapper {    private FileSystem fileSys = null;    private int numBytesToWrite;    private Random random = new Random();    private String taskId = null;    private Path topDir = null;        private void randomizeBytes(byte[] data, int offset, int length) {      for(int i=offset + length - 1; i >= offset; --i) {        data[i] = (byte) random.nextInt(256);      }    }        /**     * Given a number of files to create, create and open those files.     */    public void map(WritableComparable key,                     Writable value,                    OutputCollector output,                     Reporter reporter) throws IOException {      int nFiles = ((IntWritable) value).get();      Path taskDir = new Path(topDir, taskId);      fileSys.mkdirs(taskDir);      byte[] buffer = new byte[32768];      for (int index = 0; index < nFiles; index++) {        FSDataOutputStream out = fileSys.create(            new Path(taskDir, Integer.toString(index)));        int toBeWritten = numBytesToWrite;        while (toBeWritten > 0) {          int nbytes = Math.min(buffer.length, toBeWritten);          randomizeBytes(buffer, 0, nbytes);          toBeWritten -= nbytes;          out.write(buffer, 0, nbytes);          reporter.setStatus("wrote " + (numBytesToWrite-toBeWritten) +              " bytes for "+ index +"th file.");        }        out.close();      }      for (int index = 0; index < nFiles; index++) {        FSDataInputStream in = fileSys.open(            new Path(taskDir, Integer.toString(index)));        int toBeRead = numBytesToWrite;        while (toBeRead > 0) {          int nbytes = Math.min(buffer.length, toBeRead);          randomizeBytes(buffer, 0, nbytes);          toBeRead -= nbytes;          in.read(buffer, 0, nbytes);          reporter.setStatus("read " + (numBytesToWrite-toBeRead) +              " bytes for "+ index +"th file.");        }        in.close();      }      fileSys.delete(taskDir); // clean up after yourself     }        /**     * Save the values out of the configuaration that we need to write     * the data.     */    public void configure(JobConf job) {      try {        fileSys = FileSystem.get(job);      } catch (IOException e) {        throw new RuntimeException("Can't get default file system", e);      }      numBytesToWrite = job.getInt("test.nnbench.bytes_per_file", 0);      topDir = new Path(job.get("test.nnbench.topdir", "/nnbench"));      taskId = job.get("mapred.task.id", (new Long(random.nextLong())).toString());    }      }    public void reduce(WritableComparable key,                      Iterator values,                     OutputCollector output,                      Reporter reporter) throws IOException {    // nothing  }    /**   * This is the main routine for launching a distributed namenode stress test.   * It runs 10 maps/node and each node creates 1M/nMaps DFS files.   * The reduce doesn't do anything.   *    * @throws IOException    */  public static void main(String[] args) throws IOException {    Configuration defaults = new Configuration();    if (args.length != 3) {      System.out.println("Usage: nnbench <out-dir> <filesPerMap> <bytesPerFile>");      return;    }    Path outDir = new Path(args[0]);    int filesPerMap = Integer.parseInt(args[1]);    int numBytesPerFile = Integer.parseInt(args[2]);        JobConf jobConf = new JobConf(defaults, NNBench.class);    jobConf.setJobName("nnbench");    jobConf.setInt("test.nnbench.bytes_per_file", numBytesPerFile);    jobConf.set("test.nnbench.topdir", args[0]);        // turn off speculative execution, because DFS doesn't handle    // multiple writers to the same file.    jobConf.setSpeculativeExecution(false);    jobConf.setInputFormat(SequenceFileInputFormat.class);    jobConf.setOutputKeyClass(BytesWritable.class);    jobConf.setOutputValueClass(BytesWritable.class);        jobConf.setMapperClass(Map.class);            jobConf.setReducerClass(NNBench.class);        JobClient client = new JobClient(jobConf);    ClusterStatus cluster = client.getClusterStatus();    int numMaps = cluster.getTaskTrackers() *          jobConf.getInt("test.nnbench.maps_per_host", 10);    jobConf.setNumMapTasks(numMaps);    System.out.println("Running " + numMaps + " maps.");    jobConf.setNumReduceTasks(1);        Path tmpDir = new Path("random-work");    Path inDir = new Path(tmpDir, "in");    Path fakeOutDir = new Path(tmpDir, "out");    FileSystem fileSys = FileSystem.get(jobConf);    if (fileSys.exists(outDir)) {      System.out.println("Error: Output directory " + outDir +                          " already exists.");      return;    }    fileSys.delete(tmpDir);    fileSys.mkdirs(inDir);    for(int i=0; i < numMaps; ++i) {      Path file = new Path(inDir, "part"+i);      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys,                                jobConf, file,                                IntWritable.class, IntWritable.class,                                CompressionType.NONE,                                (Progressable)null);      writer.append(new IntWritable(0), new IntWritable(filesPerMap));      writer.close();    }    jobConf.setInputPath(inDir);    jobConf.setOutputPath(fakeOutDir);        // Uncomment to run locally in a single process    //job_conf.set("mapred.job.tracker", "local");        Date startTime = new Date();    System.out.println("Job started: " + startTime);    try {      JobClient.runJob(jobConf);      Date endTime = new Date();      System.out.println("Job ended: " + endTime);      System.out.println("The job took " +          (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");    } finally {      fileSys.delete(tmpDir);    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -