⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dfsciotest.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    job.setInputKeyClass(UTF8.class);    job.setInputValueClass(LongWritable.class);    job.setMapperClass(mapperClass);    job.setReducerClass(AccumulatingReducer.class);    job.setOutputPath(outputDir);    job.setOutputKeyClass(UTF8.class);    job.setOutputValueClass(UTF8.class);    job.setNumReduceTasks(1);    JobClient.runJob(job);  }  /**   * Read mapper class.   */  public static class ReadMapper extends IOStatMapper {    public ReadMapper() {       super();     }    public Object doIO( Reporter reporter,                         String name,                         long totalSize                         ) throws IOException {    	  totalSize *= MEGA;            // create instance of local filesystem       FileSystem localFS = FileSystem.getNamed("local", fsConfig);            try {  	  	// native runtime  	  	Runtime runTime = Runtime.getRuntime();                // copy the dso and executable from dfs        synchronized (this) {        		localFS.delete(HDFS_TEST_DIR);        		if (!(localFS.mkdirs(HDFS_TEST_DIR))) {        			throw new IOException("Failed to create " +	HDFS_TEST_DIR + " on local filesystem");        		}    		}                synchronized (this) {          if (!localFS.exists(HDFS_SHLIB)) {        		if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) {        	    	  throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem");             }             String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB);             Process process = runTime.exec(chmodCmd);             int exitStatus = process.waitFor();             if (exitStatus != 0) {             	throw new IOException( chmodCmd + ": Failed with exitStatus: " + exitStatus );             }           }         }                 synchronized (this) {           if (!localFS.exists(HDFS_READ)) {             if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) {            	   throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem");             }             String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ);              Process process = runTime.exec(chmodCmd);             int exitStatus = process.waitFor();                          if (exitStatus != 0) {            	   throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);             }           }         }    	  	           // exec the C program    	  	 Path inFile = new Path(DATA_DIR, name);    	  	 String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " +     	  			 bufferSize);     	  	 Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString()));    	  	 int exitStatus = process.waitFor();            	  	 if (exitStatus != 0) {    	  	   throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus);    	  	 }      } catch (InterruptedException interruptedException) {    	  	reporter.setStatus(interruptedException.toString());      } finally {        localFS.close();      }      return new Long(totalSize);    	}  }  private static void readTest(FileSystem fs) throws IOException {    fs.delete(READ_DIR);    runIOTest( ReadMapper.class, READ_DIR );  }  private static void sequentialTest(                                     FileSystem fs,                                      int testType,                                      int fileSize,                                      int nrFiles                                     ) throws Exception {    IOStatMapper ioer = null;    if( testType == TEST_TYPE_READ )      ioer = new ReadMapper();    else if( testType == TEST_TYPE_WRITE )      ioer = new WriteMapper();    else      return;    for( int i=0; i < nrFiles; i++)      ioer.doIO(new Reporter() {          public void setStatus(String status) throws IOException {}          public void progress() throws IOException {}        },                BASE_FILE_NAME+Integer.toString(i),                 MEGA*fileSize );  }  public static void main(String[] args) {    int testType = TEST_TYPE_READ;    int bufferSize = DEFAULT_BUFFER_SIZE;    int fileSize = 1;    int nrFiles = 1;    String resFileName = DEFAULT_RES_FILE_NAME;    boolean isSequential = false;    String version="DFSCIOTest.0.0.1";    String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";        System.out.println(version);    if (args.length == 0) {      System.err.println(usage);      System.exit(-1);    }    for (int i = 0; i < args.length; i++) {       // parse command line      if (args[i].startsWith("-r")) {        testType = TEST_TYPE_READ;      } else if (args[i].startsWith("-w")) {        testType = TEST_TYPE_WRITE;      } else if (args[i].startsWith("-clean")) {        testType = TEST_TYPE_CLEANUP;      } else if (args[i].startsWith("-seq")) {        isSequential = true;      } else if (args[i].equals("-nrFiles")) {        nrFiles = Integer.parseInt(args[++i]);      } else if (args[i].equals("-fileSize")) {        fileSize = Integer.parseInt(args[++i]);      } else if (args[i].equals("-bufferSize")) {        bufferSize = Integer.parseInt(args[++i]);      } else if (args[i].equals("-resFile")) {        resFileName = args[++i];      }    }    LOG.info("nrFiles = " + nrFiles);    LOG.info("fileSize (MB) = " + fileSize);    LOG.info("bufferSize = " + bufferSize);      try {      fsConfig.setInt("test.io.file.buffer.size", bufferSize);      FileSystem fs = FileSystem.get(fsConfig);            if (testType != TEST_TYPE_CLEANUP) {    	  	fs.delete(HDFS_TEST_DIR);    	  	fs.mkdirs(HDFS_TEST_DIR);    	  	//Copy the executables over to the remote filesystem    	  	String hadoopHome = System.getenv("HADOOP_HOME");    	  	fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION),    	  			HDFS_SHLIB);    	  	fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ);    	  	fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE);      }      if( isSequential ) {        long tStart = System.currentTimeMillis();        sequentialTest( fs, testType, fileSize, nrFiles );        long execTime = System.currentTimeMillis() - tStart;        String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;        LOG.info( resultLine );        return;      }      if( testType == TEST_TYPE_CLEANUP ) {        cleanup( fs );        return;      }      createControlFile(fs, fileSize, nrFiles);      long tStart = System.currentTimeMillis();      if( testType == TEST_TYPE_WRITE )        writeTest(fs);      if( testType == TEST_TYPE_READ )        readTest(fs);      long execTime = System.currentTimeMillis() - tStart;          analyzeResult( fs, testType, execTime, resFileName );    } catch( Exception e ) {      System.err.print( e.getLocalizedMessage());      System.exit(-1);    }  }    private static void analyzeResult(  FileSystem fs,                                       int testType,                                      long execTime,                                      String resFileName                                      ) throws IOException {    Path reduceFile;    if( testType == TEST_TYPE_WRITE )      reduceFile = new Path( WRITE_DIR, "part-00000" );    else      reduceFile = new Path( READ_DIR, "part-00000" );    DataInputStream in;    in = new DataInputStream(fs.open( reduceFile ));      BufferedReader lines;    lines = new BufferedReader(new InputStreamReader(in));    long tasks = 0;    long size = 0;    long time = 0;    float rate = 0;    float sqrate = 0;    String line;    while( (line = lines.readLine()) != null ) {      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");      String attr = tokens.nextToken();       if( attr.endsWith(":tasks") )        tasks = Long.parseLong( tokens.nextToken() );      else if( attr.endsWith(":size") )        size = Long.parseLong( tokens.	nextToken() );      else if( attr.endsWith(":time") )        time = Long.parseLong( tokens.nextToken() );      else if( attr.endsWith(":rate") )        rate = Float.parseFloat( tokens.nextToken() );      else if( attr.endsWith(":sqrate") )        sqrate = Float.parseFloat( tokens.nextToken() );    }        double med = rate / 1000 / tasks;    double stdDev = Math.sqrt( Math.abs(sqrate / 1000 / tasks - med*med ));    String resultLines[] = {      "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :                                    (testType == TEST_TYPE_READ) ? "read" :                                     "unknown"),      "           Date & time: " + new Date(System.currentTimeMillis()),      "       Number of files: " + tasks,      "Total MBytes processed: " + size/MEGA,      "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),      "Average IO rate mb/sec: " + med,      " Std IO rate deviation: " + stdDev,      "    Test exec time sec: " + (float)execTime / 1000,      "" };    PrintStream res = new PrintStream(                                       new FileOutputStream(                                                            new File(resFileName), true ));     for( int i = 0; i < resultLines.length; i++ ) {      LOG.info( resultLines[i] );      res.println( resultLines[i] );    }  }  private static void cleanup( FileSystem fs ) throws Exception {    LOG.info( "Cleaning up test files" );    fs.delete(new Path(TEST_ROOT_DIR));    fs.delete(HDFS_TEST_DIR);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -