📄 clustertestdfs.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import junit.framework.TestCase;import junit.framework.AssertionFailedError;import org.apache.hadoop.fs.FSInputStream;import org.apache.hadoop.fs.FSOutputStream;import org.apache.hadoop.fs.FileUtil;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.util.LogFormatter;import org.apache.hadoop.conf.Configuration;import java.io.File;import java.io.FilenameFilter;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.ListIterator;import java.util.logging.Logger;import java.util.Random;import java.lang.reflect.Constructor;import java.lang.reflect.InvocationTargetException;/** * Test DFS. * ClusterTestDFS is a JUnit test for DFS using "pseudo multiprocessing" (or more strictly, pseudo distributed) meaning all daemons run in one process and sockets are used to communicate between daemons. The test permutes * various block sizes, number of files, file sizes, and number of * datanodes. After creating 1 or more files and filling them with random * data, one datanode is shutdown, and then the files are verfified. * Next, all the random test files are deleted and we test for leakage * (non-deletion) by directly checking the real directories corresponding * to the datanodes still running. * <p> * Usage notes: TEST_PERMUTATION_MAX can be adjusted to perform more or * less testing of permutations. The ceiling of useful permutation is * TEST_PERMUTATION_MAX_CEILING. * <p> * DFSClient emits many messages that can be ignored like: * "Failed to connect to *:7000:java.net.ConnectException: Connection refused: connect" * because a datanode is forced to close during testing. * <p> * Warnings about "Zero targets found" can be ignored (these are naggingly * emitted even though it is not possible to achieve the desired replication * level with the number of active datanodes.) * <p> * Possible Extensions: * <p>Bring a datanode down and restart it to verify reconnection to namenode. * <p>Simulate running out of disk space on one datanode only. * <p>Bring the namenode down and restart it to verify that datanodes reconnect. * <p> * <p>For a another approach to filesystem testing, see the high level * (HadoopFS level) test {@link org.apache.hadoop.fs.TestFileSystem}. * @author Paul Baclace */public class ClusterTestDFS extends TestCase implements FSConstants { private static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.ClusterTestDFS"); private static Configuration conf = new Configuration(); private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096); private static int testCycleNumber = 0; /** * all DFS test files go under this base directory */ private static String baseDirSpecified; /** * base dir as File */ private static File baseDir; /** DFS block sizes to permute over in multiple test cycles * (array length should be prime). */ private static final int[] BLOCK_SIZES = {100000, 4096}; /** DFS file sizes to permute over in multiple test cycles * (array length should be prime). */ private static final int[] FILE_SIZES = {100000, 100001, 4095, 4096, 4097, 1000000, 1000001}; /** DFS file counts to permute over in multiple test cycles * (array length should be prime). */ private static final int[] FILE_COUNTS = {1, 10, 100}; /** Number of useful permutations or test cycles. * (The 2 factor represents the alternating 2 or 3 number of datanodes * started.) */ private static final int TEST_PERMUTATION_MAX_CEILING = BLOCK_SIZES.length * FILE_SIZES.length * FILE_COUNTS.length * 2; /** Number of permutations of DFS test parameters to perform. * If this is greater than ceiling TEST_PERMUTATION_MAX_CEILING, then the * ceiling value is used. */ private static final int TEST_PERMUTATION_MAX = 3; private Constructor randomDataGeneratorCtor = null; static { baseDirSpecified = System.getProperty("test.dfs.data", "/tmp/dfs_test"); baseDir = new File(baseDirSpecified); } protected void setUp() throws Exception { super.setUp(); conf.setBoolean("test.dfs.same.host.targets.allowed", true); } /** * Remove old files from temp area used by this test case and be sure * base temp directory can be created. */ protected void prepareTempFileSpace() { if (baseDir.exists()) { try { // start from a blank slate FileUtil.fullyDelete(baseDir, conf); } catch (Exception ignored) { } } baseDir.mkdirs(); if (!baseDir.isDirectory()) { throw new RuntimeException("Value of root directory property test.dfs.data for dfs test is not a directory: " + baseDirSpecified); } } /** * Pseudo Distributed FS Test. * Test DFS by running all the necessary daemons in one process. * Test various block sizes, number of files, disk space consumption, * and leakage. * * @throws Exception */ public void testFsPseudoDistributed() throws Exception { while (testCycleNumber < TEST_PERMUTATION_MAX && testCycleNumber < TEST_PERMUTATION_MAX_CEILING) { int blockSize = BLOCK_SIZES[testCycleNumber % BLOCK_SIZES.length]; int numFiles = FILE_COUNTS[testCycleNumber % FILE_COUNTS.length]; int fileSize = FILE_SIZES[testCycleNumber % FILE_SIZES.length]; prepareTempFileSpace(); testFsPseudoDistributed(fileSize, numFiles, blockSize, (testCycleNumber % 2) + 2); } } /** * Pseudo Distributed FS Testing. * Do one test cycle with given parameters. * * @param nBytes number of bytes to write to each file. * @param numFiles number of files to create. * @param blockSize block size to use for this test cycle. * @param initialDNcount number of datanodes to create * @throws Exception */ public void testFsPseudoDistributed(long nBytes, int numFiles, int blockSize, int initialDNcount) throws Exception { long startTime = System.currentTimeMillis(); int bufferSize = Math.min(BUFFER_SIZE, blockSize); boolean checkDataDirsEmpty = false; int iDatanodeClosed = 0; Random randomDataGenerator = makeRandomDataGenerator(); final int currentTestCycleNumber = testCycleNumber; msg("using randomDataGenerator=" + randomDataGenerator.getClass().getName()); // // modify config for test // // set given config param to override other config settings conf.setInt("test.dfs.block_size", blockSize); // verify that config changed assertTrue(blockSize == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size // downsize for testing (just to save resources) conf.setInt("dfs.namenode.handler.count", 3); if (false) { // use MersenneTwister, if present conf.set("hadoop.random.class", "org.apache.hadoop.util.MersenneTwister"); } conf.setLong("dfs.blockreport.intervalMsec", 50*1000L); conf.setLong("dfs.datanode.startupMsec", 15*1000L); String nameFSDir = baseDirSpecified + "/name"; msg("----Start Test Cycle=" + currentTestCycleNumber + " test.dfs.block_size=" + blockSize + " nBytes=" + nBytes + " numFiles=" + numFiles + " initialDNcount=" + initialDNcount); // // start a NameNode int nameNodePort = 9000 + testCycleNumber++; // ToDo: settable base port String nameNodeSocketAddr = "localhost:" + nameNodePort; NameNode nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort, conf); DFSClient dfsClient = null; try { // // start some DataNodes // ArrayList listOfDataNodeDaemons = new ArrayList(); conf.set("fs.default.name", nameNodeSocketAddr); for (int i = 0; i < initialDNcount; i++) { // uniquely config real fs path for data storage for this datanode String dataDir = baseDirSpecified + "/datanode" + i; conf.set("dfs.data.dir", dataDir); DataNode dn = DataNode.makeInstanceForDir(dataDir, conf); if (dn != null) { listOfDataNodeDaemons.add(dn); (new Thread(dn, "DataNode" + i + ": " + dataDir)).start(); } } try { assertTrue("insufficient datanodes for test to continue", (listOfDataNodeDaemons.size() >= 2)); // // wait for datanodes to report in awaitQuiescence(); // act as if namenode is a remote process dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf); // // write nBytes of data using randomDataGenerator to numFiles // ArrayList testfilesList = new ArrayList(); byte[] buffer = new byte[bufferSize]; UTF8 testFileName = null; for (int iFileNumber = 0; iFileNumber < numFiles; iFileNumber++) { testFileName = new UTF8("/f" + iFileNumber); testfilesList.add(testFileName); FSOutputStream nos = dfsClient.create(testFileName, false); try { for (long nBytesWritten = 0L; nBytesWritten < nBytes; nBytesWritten += buffer.length) { if ((nBytesWritten + buffer.length) > nBytes) { // calculate byte count needed to exactly hit nBytes in length // to keep randomDataGenerator in sync during the verify step int pb = (int) (nBytes - nBytesWritten);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -