📄 dfsciotest.java
字号:
job.setInputKeyClass(UTF8.class); job.setInputValueClass(LongWritable.class); job.setMapperClass(mapperClass); job.setReducerClass(AccumulatingReducer.class); job.setOutputPath(outputDir); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(UTF8.class); job.setNumReduceTasks(1); JobClient.runJob(job); } /** * Read mapper class. */ public static class ReadMapper extends IOStatMapper { public ReadMapper() { super(); } public Object doIO( Reporter reporter, String name, long totalSize ) throws IOException { totalSize *= MEGA; // create instance of local filesystem FileSystem localFS = FileSystem.getNamed("local", fsConfig); try { // native runtime Runtime runTime = Runtime.getRuntime(); // copy the dso and executable from dfs synchronized (this) { localFS.delete(HDFS_TEST_DIR); if (!(localFS.mkdirs(HDFS_TEST_DIR))) { throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); } } synchronized (this) { if (!localFS.exists(HDFS_SHLIB)) { if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) { throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem"); } String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); Process process = runTime.exec(chmodCmd); int exitStatus = process.waitFor(); if (exitStatus != 0) { throw new IOException( chmodCmd + ": Failed with exitStatus: " + exitStatus ); } } } synchronized (this) { if (!localFS.exists(HDFS_READ)) { if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) { throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem"); } String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); Process process = runTime.exec(chmodCmd); int exitStatus = process.waitFor(); if (exitStatus != 0) { throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); } } } // exec the C program Path inFile = new Path(DATA_DIR, name); String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + bufferSize); Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString())); int exitStatus = process.waitFor(); if (exitStatus != 0) { throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus); } } catch (InterruptedException interruptedException) { reporter.setStatus(interruptedException.toString()); } finally { localFS.close(); } return new Long(totalSize); } } private static void readTest(FileSystem fs) throws IOException { fs.delete(READ_DIR); runIOTest( ReadMapper.class, READ_DIR ); } private static void sequentialTest( FileSystem fs, int testType, int fileSize, int nrFiles ) throws Exception { IOStatMapper ioer = null; if( testType == TEST_TYPE_READ ) ioer = new ReadMapper(); else if( testType == TEST_TYPE_WRITE ) ioer = new WriteMapper(); else return; for( int i=0; i < nrFiles; i++) ioer.doIO(new Reporter() { public void setStatus(String status) throws IOException {} public void progress() throws IOException {} }, BASE_FILE_NAME+Integer.toString(i), MEGA*fileSize ); } public static void main(String[] args) { int testType = TEST_TYPE_READ; int bufferSize = DEFAULT_BUFFER_SIZE; int fileSize = 1; int nrFiles = 1; String resFileName = DEFAULT_RES_FILE_NAME; boolean isSequential = false; String version="DFSCIOTest.0.0.1"; String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; System.out.println(version); if (args.length == 0) { System.err.println(usage); System.exit(-1); } for (int i = 0; i < args.length; i++) { // parse command line if (args[i].startsWith("-r")) { testType = TEST_TYPE_READ; } else if (args[i].startsWith("-w")) { testType = TEST_TYPE_WRITE; } else if (args[i].startsWith("-clean")) { testType = TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { isSequential = true; } else if (args[i].equals("-nrFiles")) { nrFiles = Integer.parseInt(args[++i]); } else if (args[i].equals("-fileSize")) { fileSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-bufferSize")) { bufferSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-resFile")) { resFileName = args[++i]; } } LOG.info("nrFiles = " + nrFiles); LOG.info("fileSize (MB) = " + fileSize); LOG.info("bufferSize = " + bufferSize); try { fsConfig.setInt("test.io.file.buffer.size", bufferSize); FileSystem fs = FileSystem.get(fsConfig); if (testType != TEST_TYPE_CLEANUP) { fs.delete(HDFS_TEST_DIR); fs.mkdirs(HDFS_TEST_DIR); //Copy the executables over to the remote filesystem String hadoopHome = System.getenv("HADOOP_HOME"); fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION), HDFS_SHLIB); fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ); fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE); } if( isSequential ) { long tStart = System.currentTimeMillis(); sequentialTest( fs, testType, fileSize, nrFiles ); long execTime = System.currentTimeMillis() - tStart; String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; LOG.info( resultLine ); return; } if( testType == TEST_TYPE_CLEANUP ) { cleanup( fs ); return; } createControlFile(fs, fileSize, nrFiles); long tStart = System.currentTimeMillis(); if( testType == TEST_TYPE_WRITE ) writeTest(fs); if( testType == TEST_TYPE_READ ) readTest(fs); long execTime = System.currentTimeMillis() - tStart; analyzeResult( fs, testType, execTime, resFileName ); } catch( Exception e ) { System.err.print( e.getLocalizedMessage()); System.exit(-1); } } private static void analyzeResult( FileSystem fs, int testType, long execTime, String resFileName ) throws IOException { Path reduceFile; if( testType == TEST_TYPE_WRITE ) reduceFile = new Path( WRITE_DIR, "part-00000" ); else reduceFile = new Path( READ_DIR, "part-00000" ); DataInputStream in; in = new DataInputStream(fs.open( reduceFile )); BufferedReader lines; lines = new BufferedReader(new InputStreamReader(in)); long tasks = 0; long size = 0; long time = 0; float rate = 0; float sqrate = 0; String line; while( (line = lines.readLine()) != null ) { StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); String attr = tokens.nextToken(); if( attr.endsWith(":tasks") ) tasks = Long.parseLong( tokens.nextToken() ); else if( attr.endsWith(":size") ) size = Long.parseLong( tokens. nextToken() ); else if( attr.endsWith(":time") ) time = Long.parseLong( tokens.nextToken() ); else if( attr.endsWith(":rate") ) rate = Float.parseFloat( tokens.nextToken() ); else if( attr.endsWith(":sqrate") ) sqrate = Float.parseFloat( tokens.nextToken() ); } double med = rate / 1000 / tasks; double stdDev = Math.sqrt( Math.abs(sqrate / 1000 / tasks - med*med )); String resultLines[] = { "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : (testType == TEST_TYPE_READ) ? "read" : "unknown"), " Date & time: " + new Date(System.currentTimeMillis()), " Number of files: " + tasks, "Total MBytes processed: " + size/MEGA, " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), "Average IO rate mb/sec: " + med, " Std IO rate deviation: " + stdDev, " Test exec time sec: " + (float)execTime / 1000, "" }; PrintStream res = new PrintStream( new FileOutputStream( new File(resFileName), true )); for( int i = 0; i < resultLines.length; i++ ) { LOG.info( resultLines[i] ); res.println( resultLines[i] ); } } private static void cleanup( FileSystem fs ) throws Exception { LOG.info( "Cleaning up test files" ); fs.delete(new Path(TEST_ROOT_DIR)); fs.delete(HDFS_TEST_DIR); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -