⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dfsck.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.util.ArrayList;import java.util.Random;import java.util.TreeSet;import org.apache.commons.logging.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSOutputStream;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.util.ToolBase;/** * This class provides rudimentary checking of DFS volumes for errors and * sub-optimal conditions. * <p>The tool scans all files and directories, starting from an indicated *  root path. The following abnormal conditions are detected and handled:</p> * <ul> * <li>files with blocks that are completely missing from all datanodes.<br/> * In this case the tool can perform one of the following actions: *  <ul> *      <li>none ({@link #FIXING_NONE})</li> *      <li>move corrupted files to /lost+found directory on DFS *      ({@link #FIXING_MOVE}). Remaining data blocks are saved as a *      block chains, representing longest consecutive series of valid blocks.</li> *      <li>delete corrupted files ({@link #FIXING_DELETE})</li> *  </ul> *  </li> *  <li>detect files with under-replicated or over-replicated blocks</li> *  </ul> *  Additionally, the tool collects a detailed overall DFS statistics, and *  optionally can print detailed statistics on block locations and replication *  factors of each file. *   * @author Andrzej Bialecki */public class DFSck extends ToolBase {  private static final Log LOG = LogFactory.getLog(DFSck.class.getName());  /** Don't attempt any fixing . */  public static final int FIXING_NONE = 0;  /** Move corrupted files to /lost+found . */  public static final int FIXING_MOVE = 1;  /** Delete corrupted files. */  public static final int FIXING_DELETE = 2;    private DFSClient dfs;  private UTF8 lostFound = null;  private boolean lfInited = false;  private boolean lfInitedOk = false;  private boolean showFiles = false;  private boolean showBlocks = false;  private boolean showLocations = false;  private int fixing;   DFSck() {  }    /**   * Filesystem checker.   * @param conf current Configuration   * @param fixing one of pre-defined values   * @param showFiles show each file being checked   * @param showBlocks for each file checked show its block information   * @param showLocations for each block in each file show block locations   * @throws Exception   */  public DFSck(Configuration conf, int fixing, boolean showFiles, boolean showBlocks, boolean showLocations) throws Exception {    setConf(conf);    init(fixing, showFiles, showBlocks, showLocations);  }    public void init(int fixing, boolean showFiles,           boolean showBlocks, boolean showLocations) throws IOException {      String fsName = conf.get("fs.default.name", "local");      if (fsName.equals("local")) {        throw new IOException("This tool only checks DFS, but your config uses 'local' FS.");      }      this.dfs = new DFSClient(DataNode.createSocketAddr(fsName), conf);      this.fixing = fixing;      this.showFiles = showFiles;      this.showBlocks = showBlocks;      this.showLocations = showLocations;  }    /**   * Check files on DFS, starting from the indicated path.   * @param path starting point   * @return result of checking   * @throws Exception   */  public Result fsck(String path) throws Exception {    DFSFileInfo[] files = dfs.listPaths(new UTF8(path));    Result res = new Result();    res.setReplication(dfs.getDefaultReplication());    for (int i = 0; i < files.length; i++) {      check(files[i], res);    }    return res;  }    private void check(DFSFileInfo file, Result res) throws Exception {    if (file.isDir()) {      if (showFiles)        System.out.println(file.getPath() + " <dir>");      res.totalDirs++;      DFSFileInfo[] files = dfs.listPaths(new UTF8(file.getPath()));      for (int i = 0; i < files.length; i++) {        check(files[i], res);      }      return;    }    res.totalFiles++;    res.totalSize += file.getLen();    LocatedBlock[] blocks = dfs.namenode.open(file.getPath());    res.totalBlocks += blocks.length;    if (showFiles) {      System.out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");    } else {      System.out.print('.');      System.out.flush();      if (res.totalFiles % 100 == 0) System.out.println();    }    int missing = 0;    long missize = 0;    StringBuffer report = new StringBuffer();    for (int i = 0; i < blocks.length; i++) {      Block block = blocks[i].getBlock();      long id = block.getBlockId();      DatanodeInfo[] locs = blocks[i].getLocations();      short targetFileReplication = file.getReplication();      if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication);      if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length);      report.append(i + ". " + id + " len=" + block.getNumBytes());      if (locs == null || locs.length == 0) {        report.append(" MISSING!");        res.addMissing(block.getBlockName(), block.getNumBytes());        missing++;        missize += block.getNumBytes();      } else {        report.append(" repl=" + locs.length);        if (showLocations) {          StringBuffer sb = new StringBuffer("[");          for (int j = 0; j < locs.length; j++) {            if (j > 0) sb.append(", ");            sb.append(locs[j]);          }          sb.append(']');          report.append(" " + sb.toString());        }      }      report.append('\n');    }    if (missing > 0) {      if (!showFiles)        System.out.println("\nMISSING " + missing + " blocks of total size " + missize + " B");      res.corruptFiles++;      switch (fixing) {        case FIXING_NONE: // do nothing          System.err.println("\n - ignoring corrupted " + file.getPath());          break;        case FIXING_MOVE:          System.err.println("\n - moving to /lost+found: " + file.getPath());          lostFoundMove(file, blocks);          break;        case FIXING_DELETE:          System.err.println("\n - deleting corrupted " + file.getPath());          dfs.delete(new UTF8(file.getPath()));      }    }    if (showFiles) {      if (missing > 0) {        System.out.println(" MISSING " + missing + " blocks of total size " + missize + " B");      } else System.out.println(" OK");      if (showBlocks) System.out.println(report.toString());    }  }    private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks) {    if (!lfInited) {      lostFoundInit();    }    if (!lfInitedOk) {      return;    }    UTF8 target = new UTF8(lostFound.toString() + file.getPath());    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";    try {      if (!dfs.mkdirs(target)) {        System.err.println(errmsg);        return;      }      // create chains      int chain = 0;      FSOutputStream fos = null;      for (int i = 0; i < blocks.length; i++) {        LocatedBlock lblock = blocks[i];        DatanodeInfo[] locs = lblock.getLocations();        if (locs == null || locs.length == 0) {          if (fos != null) {            fos.flush();            fos.close();            fos = null;          }          continue;        }        if (fos == null) {          fos = dfs.create(new UTF8(target.toString() + "/" + chain), true);          if (fos != null) chain++;        }        if (fos == null) {          System.err.println(errmsg + ": could not store chain " + chain);          // perhaps we should bail out here...          // return;          continue;        }                // copy the block. It's a pity it's not abstracted from DFSInputStream ...        try {          copyBlock(lblock, fos);        } catch (Exception e) {          e.printStackTrace();          // something went wrong copying this block...          System.err.println(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target);          fos.flush();          fos.close();          fos = null;        }      }      if (fos != null) fos.close();      System.err.println("\n - moved corrupted file " + file.getPath() + " to /lost+found");      dfs.delete(new UTF8(file.getPath()));    } catch (Exception e) {      e.printStackTrace();      System.err.println(errmsg + ": " + e.getMessage());    }  }    /*   * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is   * bad. Both places should be refactored to provide a method to copy blocks   * around.   */  private void copyBlock(LocatedBlock lblock, FSOutputStream fos) throws Exception {    int failures = 0;    InetSocketAddress targetAddr = null;    TreeSet deadNodes = new TreeSet();    Socket s = null;    DataInputStream in = null;    DataOutputStream out = null;    while (s == null) {        DatanodeInfo chosenNode;        try {            chosenNode = bestNode(lblock.getLocations(), deadNodes);            targetAddr = DataNode.createSocketAddr(chosenNode.getName());        } catch (IOException ie) {            if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {                throw new IOException("Could not obtain block " + lblock);            }            LOG.info("Could not obtain block from any node:  " + ie);            try {                Thread.sleep(10000);            } catch (InterruptedException iex) {            }            deadNodes.clear();            failures++;            continue;        }        try {            s = new Socket();            s.connect(targetAddr, FSConstants.READ_TIMEOUT);            s.setSoTimeout(FSConstants.READ_TIMEOUT);            //            // Xmit header info to datanode            //            out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));            out.write(FSConstants.OP_READSKIP_BLOCK);            lblock.getBlock().write(out);            out.writeLong(0L);            out.flush();            //            // Get bytes in block, set streams

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -