⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clustertestdfs.java

📁 Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。在 Hadoop中实现了Google的MapReduce算法
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                byte[] bufferPartial = new byte[pb];                randomDataGenerator.nextBytes(bufferPartial);                nos.write(bufferPartial);              } else {                randomDataGenerator.nextBytes(buffer);                nos.write(buffer);              }            }          } finally {            nos.flush();            nos.close();          }        }        //        // No need to wait for blocks to be replicated because replication        //  is supposed to be complete when the file is closed.        //        //        //                     take one datanode down        iDatanodeClosed =            currentTestCycleNumber % listOfDataNodeDaemons.size();        DataNode dn = (DataNode) listOfDataNodeDaemons.get(iDatanodeClosed);        msg("shutdown datanode daemon " + iDatanodeClosed +            " dn=" + dn.data);        try {          dn.shutdown();        } catch (Exception e) {          msg("ignoring datanode shutdown exception=" + e);        }        //        //          verify data against a "rewound" randomDataGenerator        //               that all of the data is intact        long lastLong = randomDataGenerator.nextLong();        randomDataGenerator = makeRandomDataGenerator(); // restart (make new) PRNG        ListIterator li = testfilesList.listIterator();        while (li.hasNext()) {          testFileName = (UTF8) li.next();          FSInputStream nis = dfsClient.open(testFileName);          byte[] bufferGolden = new byte[bufferSize];          int m = 42;          try {            while (m != -1) {              m = nis.read(buffer);              if (m == buffer.length) {                randomDataGenerator.nextBytes(bufferGolden);                assertBytesEqual(buffer, bufferGolden, buffer.length);              } else if (m > 0) {                byte[] bufferGoldenPartial = new byte[m];                randomDataGenerator.nextBytes(bufferGoldenPartial);                assertBytesEqual(buffer, bufferGoldenPartial, bufferGoldenPartial.length);              }            }          } finally {            nis.close();          }        }        // verify last randomDataGenerator rand val to ensure last file length was checked        long lastLongAgain = randomDataGenerator.nextLong();        assertEquals(lastLong, lastLongAgain);        msg("Finished validating all file contents");        //        //                    now delete all the created files        msg("Delete all random test files under DFS via remaining datanodes");        li = testfilesList.listIterator();        while (li.hasNext()) {          testFileName = (UTF8) li.next();          assertTrue(dfsClient.delete(testFileName));        }        //        //                   wait for delete to be propagated        //                  (unlike writing files, delete is lazy)        msg("Test thread sleeping while datanodes propagate delete...");        awaitQuiescence();        msg("Test thread awakens to verify file contents");        //        //             check that the datanode's block directory is empty        //                (except for datanode that had forced shutdown)        checkDataDirsEmpty = true; // do it during finally clause      } catch (AssertionFailedError afe) {        throw afe;      } catch (Throwable t) {        msg("Unexpected exception_b: " + t);        t.printStackTrace();      } finally {        //        // shut down datanode daemons (this takes advantage of being same-process)        msg("begin shutdown of all datanode daemons for test cycle " +            currentTestCycleNumber);        for (int i = 0; i < listOfDataNodeDaemons.size(); i++) {          DataNode dataNode = (DataNode) listOfDataNodeDaemons.get(i);          if (i != iDatanodeClosed) {            try {              if (checkDataDirsEmpty) {                File dataDir = new File(dataNode.data.diskUsage.getDirPath());                assertNoBlocks(dataDir);              }              dataNode.shutdown();            } catch (Exception e) {              msg("ignoring exception during (all) datanode shutdown, e=" + e);            }          }        }      }      msg("finished shutdown of all datanode daemons for test cycle " +          currentTestCycleNumber);      if (dfsClient != null) {        try {          msg("close down subthreads of DFSClient");          dfsClient.close();        } catch (Exception ignored) { }        msg("finished close down of DFSClient");      }    } catch (AssertionFailedError afe) {      throw afe;    } catch (Throwable t) {      msg("Unexpected exception_a: " + t);      t.printStackTrace();    } finally {      // shut down namenode daemon (this takes advantage of being same-process)      msg("begin shutdown of namenode daemon for test cycle " +          currentTestCycleNumber);      try {        nameNodeDaemon.stop();      } catch (Exception e) {        msg("ignoring namenode shutdown exception=" + e);      }      msg("finished shutdown of namenode daemon for test cycle " +          currentTestCycleNumber);    }    msg("test cycle " + currentTestCycleNumber + " elapsed time=" +        (System.currentTimeMillis() - startTime) / 1000. + "sec");    msg("threads still running (look for stragglers): ");    msg(summarizeThreadGroup());  }  private void assertNoBlocks(File datanodeDir) {    File datanodeDataDir = new File(datanodeDir, "data");    String[] blockFilenames =        datanodeDataDir.list(            new FilenameFilter() {              public boolean accept(File dir, String name){                return Block.isBlockFilename(new File(dir, name));}});    // if this fails, the delete did not propagate because either    //   awaitQuiescence() returned before the disk images were removed    //   or a real failure was detected.    assertTrue(" data dir not empty: " + datanodeDataDir,               blockFilenames.length==0);  }  /**   * Make a data generator.   * Allows optional use of high quality PRNG by setting property   * hadoop.random.class to the full class path of a subclass of   * java.util.Random such as "...util.MersenneTwister".   * The property test.dfs.random.seed can supply a seed for reproducible   * testing (a default is set here if property is not set.)   */  private Random makeRandomDataGenerator() {    long seed = conf.getLong("test.dfs.random.seed", 0xB437EF);    try {      if (randomDataGeneratorCtor == null) {        // lazy init        String rndDataGenClassname =            conf.get("hadoop.random.class", "java.util.Random");        Class clazz = Class.forName(rndDataGenClassname);        randomDataGeneratorCtor = clazz.getConstructor(new Class[]{Long.TYPE});      }      if (randomDataGeneratorCtor != null) {        Object arg[] = {new Long(seed)};        return (Random) randomDataGeneratorCtor.newInstance(arg);      }    } catch (ClassNotFoundException absorb) {    } catch (NoSuchMethodException absorb) {    } catch (SecurityException absorb) {    } catch (InstantiationException absorb) {    } catch (IllegalAccessException absorb) {    } catch (IllegalArgumentException absorb) {    } catch (InvocationTargetException absorb) {    }    // last resort    return new java.util.Random(seed);  }  /** Wait for the DFS datanodes to become quiescent.   * The initial implementation is to sleep for some fixed amount of time,   * but a better implementation would be to really detect when distributed   * operations are completed.   * @throws InterruptedException   */  private void awaitQuiescence() throws InterruptedException {    // ToDo: Need observer pattern, not static sleep    // Doug suggested that the block report interval could be made shorter    //   and then observing that would be a good way to know when an operation    //   was complete (quiescence detect).    sleepAtLeast(60000);  }  private void assertBytesEqual(byte[] buffer, byte[] bufferGolden, int len) {    for (int i = 0; i < len; i++) {      assertEquals(buffer[i], bufferGolden[i]);    }  }  private void msg(String s) {    //System.out.println(s);    LOG.info(s);  }  public static void sleepAtLeast(int tmsec) {    long t0 = System.currentTimeMillis();    long t1 = t0;    long tslept = t1 - t0;    while (tmsec > tslept) {      try {        long tsleep = tmsec - tslept;        Thread.sleep(tsleep);        t1 = System.currentTimeMillis();      }  catch (InterruptedException ie) {        t1 = System.currentTimeMillis();      }      tslept = t1 - t0;    }  }  public static String summarizeThreadGroup() {    int n = 10;    int k = 0;    Thread[] tarray = null;    StringBuffer sb = new StringBuffer(500);    do {      n = n * 10;      tarray = new Thread[n];      k = Thread.enumerate(tarray);    } while (k == n); // while array is too small...    for (int i = 0; i < k; i++) {      Thread thread = tarray[i];      sb.append(thread.toString());      sb.append("\n");    }    return sb.toString();  }  public static void main(String[] args) throws Exception {    String usage = "Usage: ClusterTestDFS (no args)";    if (args.length != 0) {      System.err.println(usage);      System.exit(-1);    }    String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFS"};    junit.textui.TestRunner.main(testargs);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -