📄 clustertestdfs.java
字号:
byte[] bufferPartial = new byte[pb]; randomDataGenerator.nextBytes(bufferPartial); nos.write(bufferPartial); } else { randomDataGenerator.nextBytes(buffer); nos.write(buffer); } } } finally { nos.flush(); nos.close(); } } // // No need to wait for blocks to be replicated because replication // is supposed to be complete when the file is closed. // // // take one datanode down iDatanodeClosed = currentTestCycleNumber % listOfDataNodeDaemons.size(); DataNode dn = (DataNode) listOfDataNodeDaemons.get(iDatanodeClosed); msg("shutdown datanode daemon " + iDatanodeClosed + " dn=" + dn.data); try { dn.shutdown(); } catch (Exception e) { msg("ignoring datanode shutdown exception=" + e); } // // verify data against a "rewound" randomDataGenerator // that all of the data is intact long lastLong = randomDataGenerator.nextLong(); randomDataGenerator = makeRandomDataGenerator(); // restart (make new) PRNG ListIterator li = testfilesList.listIterator(); while (li.hasNext()) { testFileName = (UTF8) li.next(); FSInputStream nis = dfsClient.open(testFileName); byte[] bufferGolden = new byte[bufferSize]; int m = 42; try { while (m != -1) { m = nis.read(buffer); if (m == buffer.length) { randomDataGenerator.nextBytes(bufferGolden); assertBytesEqual(buffer, bufferGolden, buffer.length); } else if (m > 0) { byte[] bufferGoldenPartial = new byte[m]; randomDataGenerator.nextBytes(bufferGoldenPartial); assertBytesEqual(buffer, bufferGoldenPartial, bufferGoldenPartial.length); } } } finally { nis.close(); } } // verify last randomDataGenerator rand val to ensure last file length was checked long lastLongAgain = randomDataGenerator.nextLong(); assertEquals(lastLong, lastLongAgain); msg("Finished validating all file contents"); // // now delete all the created files msg("Delete all random test files under DFS via remaining datanodes"); li = testfilesList.listIterator(); while (li.hasNext()) { testFileName = (UTF8) li.next(); assertTrue(dfsClient.delete(testFileName)); } // // wait for delete to be propagated // (unlike writing files, delete is lazy) msg("Test thread sleeping while datanodes propagate delete..."); awaitQuiescence(); msg("Test thread awakens to verify file contents"); // // check that the datanode's block directory is empty // (except for datanode that had forced shutdown) checkDataDirsEmpty = true; // do it during finally clause } catch (AssertionFailedError afe) { throw afe; } catch (Throwable t) { msg("Unexpected exception_b: " + t); t.printStackTrace(); } finally { // // shut down datanode daemons (this takes advantage of being same-process) msg("begin shutdown of all datanode daemons for test cycle " + currentTestCycleNumber); for (int i = 0; i < listOfDataNodeDaemons.size(); i++) { DataNode dataNode = (DataNode) listOfDataNodeDaemons.get(i); if (i != iDatanodeClosed) { try { if (checkDataDirsEmpty) { File dataDir = new File(dataNode.data.diskUsage.getDirPath()); assertNoBlocks(dataDir); } dataNode.shutdown(); } catch (Exception e) { msg("ignoring exception during (all) datanode shutdown, e=" + e); } } } } msg("finished shutdown of all datanode daemons for test cycle " + currentTestCycleNumber); if (dfsClient != null) { try { msg("close down subthreads of DFSClient"); dfsClient.close(); } catch (Exception ignored) { } msg("finished close down of DFSClient"); } } catch (AssertionFailedError afe) { throw afe; } catch (Throwable t) { msg("Unexpected exception_a: " + t); t.printStackTrace(); } finally { // shut down namenode daemon (this takes advantage of being same-process) msg("begin shutdown of namenode daemon for test cycle " + currentTestCycleNumber); try { nameNodeDaemon.stop(); } catch (Exception e) { msg("ignoring namenode shutdown exception=" + e); } msg("finished shutdown of namenode daemon for test cycle " + currentTestCycleNumber); } msg("test cycle " + currentTestCycleNumber + " elapsed time=" + (System.currentTimeMillis() - startTime) / 1000. + "sec"); msg("threads still running (look for stragglers): "); msg(summarizeThreadGroup()); } private void assertNoBlocks(File datanodeDir) { File datanodeDataDir = new File(datanodeDir, "data"); String[] blockFilenames = datanodeDataDir.list( new FilenameFilter() { public boolean accept(File dir, String name){ return Block.isBlockFilename(new File(dir, name));}}); // if this fails, the delete did not propagate because either // awaitQuiescence() returned before the disk images were removed // or a real failure was detected. assertTrue(" data dir not empty: " + datanodeDataDir, blockFilenames.length==0); } /** * Make a data generator. * Allows optional use of high quality PRNG by setting property * hadoop.random.class to the full class path of a subclass of * java.util.Random such as "...util.MersenneTwister". * The property test.dfs.random.seed can supply a seed for reproducible * testing (a default is set here if property is not set.) */ private Random makeRandomDataGenerator() { long seed = conf.getLong("test.dfs.random.seed", 0xB437EF); try { if (randomDataGeneratorCtor == null) { // lazy init String rndDataGenClassname = conf.get("hadoop.random.class", "java.util.Random"); Class clazz = Class.forName(rndDataGenClassname); randomDataGeneratorCtor = clazz.getConstructor(new Class[]{Long.TYPE}); } if (randomDataGeneratorCtor != null) { Object arg[] = {new Long(seed)}; return (Random) randomDataGeneratorCtor.newInstance(arg); } } catch (ClassNotFoundException absorb) { } catch (NoSuchMethodException absorb) { } catch (SecurityException absorb) { } catch (InstantiationException absorb) { } catch (IllegalAccessException absorb) { } catch (IllegalArgumentException absorb) { } catch (InvocationTargetException absorb) { } // last resort return new java.util.Random(seed); } /** Wait for the DFS datanodes to become quiescent. * The initial implementation is to sleep for some fixed amount of time, * but a better implementation would be to really detect when distributed * operations are completed. * @throws InterruptedException */ private void awaitQuiescence() throws InterruptedException { // ToDo: Need observer pattern, not static sleep // Doug suggested that the block report interval could be made shorter // and then observing that would be a good way to know when an operation // was complete (quiescence detect). sleepAtLeast(60000); } private void assertBytesEqual(byte[] buffer, byte[] bufferGolden, int len) { for (int i = 0; i < len; i++) { assertEquals(buffer[i], bufferGolden[i]); } } private void msg(String s) { //System.out.println(s); LOG.info(s); } public static void sleepAtLeast(int tmsec) { long t0 = System.currentTimeMillis(); long t1 = t0; long tslept = t1 - t0; while (tmsec > tslept) { try { long tsleep = tmsec - tslept; Thread.sleep(tsleep); t1 = System.currentTimeMillis(); } catch (InterruptedException ie) { t1 = System.currentTimeMillis(); } tslept = t1 - t0; } } public static String summarizeThreadGroup() { int n = 10; int k = 0; Thread[] tarray = null; StringBuffer sb = new StringBuffer(500); do { n = n * 10; tarray = new Thread[n]; k = Thread.enumerate(tarray); } while (k == n); // while array is too small... for (int i = 0; i < k; i++) { Thread thread = tarray[i]; sb.append(thread.toString()); sb.append("\n"); } return sb.toString(); } public static void main(String[] args) throws Exception { String usage = "Usage: ClusterTestDFS (no args)"; if (args.length != 0) { System.err.println(usage); System.exit(-1); } String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFS"}; junit.textui.TestRunner.main(testargs); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -