📄 dfsck.java
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.util.ArrayList;import java.util.Random;import java.util.TreeSet;import org.apache.commons.logging.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSOutputStream;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.util.ToolBase;/** * This class provides rudimentary checking of DFS volumes for errors and * sub-optimal conditions. * <p>The tool scans all files and directories, starting from an indicated * root path. The following abnormal conditions are detected and handled:</p> * <ul> * <li>files with blocks that are completely missing from all datanodes.<br/> * In this case the tool can perform one of the following actions: * <ul> * <li>none ({@link #FIXING_NONE})</li> * <li>move corrupted files to /lost+found directory on DFS * ({@link #FIXING_MOVE}). Remaining data blocks are saved as a * block chains, representing longest consecutive series of valid blocks.</li> * <li>delete corrupted files ({@link #FIXING_DELETE})</li> * </ul> * </li> * <li>detect files with under-replicated or over-replicated blocks</li> * </ul> * Additionally, the tool collects a detailed overall DFS statistics, and * optionally can print detailed statistics on block locations and replication * factors of each file. * * @author Andrzej Bialecki */public class DFSck extends ToolBase { private static final Log LOG = LogFactory.getLog(DFSck.class.getName()); /** Don't attempt any fixing . */ public static final int FIXING_NONE = 0; /** Move corrupted files to /lost+found . */ public static final int FIXING_MOVE = 1; /** Delete corrupted files. */ public static final int FIXING_DELETE = 2; private DFSClient dfs; private UTF8 lostFound = null; private boolean lfInited = false; private boolean lfInitedOk = false; private boolean showFiles = false; private boolean showBlocks = false; private boolean showLocations = false; private int fixing; DFSck() { } /** * Filesystem checker. * @param conf current Configuration * @param fixing one of pre-defined values * @param showFiles show each file being checked * @param showBlocks for each file checked show its block information * @param showLocations for each block in each file show block locations * @throws Exception */ public DFSck(Configuration conf, int fixing, boolean showFiles, boolean showBlocks, boolean showLocations) throws Exception { setConf(conf); init(fixing, showFiles, showBlocks, showLocations); } public void init(int fixing, boolean showFiles, boolean showBlocks, boolean showLocations) throws IOException { String fsName = conf.get("fs.default.name", "local"); if (fsName.equals("local")) { throw new IOException("This tool only checks DFS, but your config uses 'local' FS."); } this.dfs = new DFSClient(DataNode.createSocketAddr(fsName), conf); this.fixing = fixing; this.showFiles = showFiles; this.showBlocks = showBlocks; this.showLocations = showLocations; } /** * Check files on DFS, starting from the indicated path. * @param path starting point * @return result of checking * @throws Exception */ public Result fsck(String path) throws Exception { DFSFileInfo[] files = dfs.listPaths(new UTF8(path)); Result res = new Result(); res.setReplication(dfs.getDefaultReplication()); for (int i = 0; i < files.length; i++) { check(files[i], res); } return res; } private void check(DFSFileInfo file, Result res) throws Exception { if (file.isDir()) { if (showFiles) System.out.println(file.getPath() + " <dir>"); res.totalDirs++; DFSFileInfo[] files = dfs.listPaths(new UTF8(file.getPath())); for (int i = 0; i < files.length; i++) { check(files[i], res); } return; } res.totalFiles++; res.totalSize += file.getLen(); LocatedBlock[] blocks = dfs.namenode.open(file.getPath()); res.totalBlocks += blocks.length; if (showFiles) { System.out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): "); } else { System.out.print('.'); System.out.flush(); if (res.totalFiles % 100 == 0) System.out.println(); } int missing = 0; long missize = 0; StringBuffer report = new StringBuffer(); for (int i = 0; i < blocks.length; i++) { Block block = blocks[i].getBlock(); long id = block.getBlockId(); DatanodeInfo[] locs = blocks[i].getLocations(); short targetFileReplication = file.getReplication(); if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication); if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length); report.append(i + ". " + id + " len=" + block.getNumBytes()); if (locs == null || locs.length == 0) { report.append(" MISSING!"); res.addMissing(block.getBlockName(), block.getNumBytes()); missing++; missize += block.getNumBytes(); } else { report.append(" repl=" + locs.length); if (showLocations) { StringBuffer sb = new StringBuffer("["); for (int j = 0; j < locs.length; j++) { if (j > 0) sb.append(", "); sb.append(locs[j]); } sb.append(']'); report.append(" " + sb.toString()); } } report.append('\n'); } if (missing > 0) { if (!showFiles) System.out.println("\nMISSING " + missing + " blocks of total size " + missize + " B"); res.corruptFiles++; switch (fixing) { case FIXING_NONE: // do nothing System.err.println("\n - ignoring corrupted " + file.getPath()); break; case FIXING_MOVE: System.err.println("\n - moving to /lost+found: " + file.getPath()); lostFoundMove(file, blocks); break; case FIXING_DELETE: System.err.println("\n - deleting corrupted " + file.getPath()); dfs.delete(new UTF8(file.getPath())); } } if (showFiles) { if (missing > 0) { System.out.println(" MISSING " + missing + " blocks of total size " + missize + " B"); } else System.out.println(" OK"); if (showBlocks) System.out.println(report.toString()); } } private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks) { if (!lfInited) { lostFoundInit(); } if (!lfInitedOk) { return; } UTF8 target = new UTF8(lostFound.toString() + file.getPath()); String errmsg = "Failed to move " + file.getPath() + " to /lost+found"; try { if (!dfs.mkdirs(target)) { System.err.println(errmsg); return; } // create chains int chain = 0; FSOutputStream fos = null; for (int i = 0; i < blocks.length; i++) { LocatedBlock lblock = blocks[i]; DatanodeInfo[] locs = lblock.getLocations(); if (locs == null || locs.length == 0) { if (fos != null) { fos.flush(); fos.close(); fos = null; } continue; } if (fos == null) { fos = dfs.create(new UTF8(target.toString() + "/" + chain), true); if (fos != null) chain++; } if (fos == null) { System.err.println(errmsg + ": could not store chain " + chain); // perhaps we should bail out here... // return; continue; } // copy the block. It's a pity it's not abstracted from DFSInputStream ... try { copyBlock(lblock, fos); } catch (Exception e) { e.printStackTrace(); // something went wrong copying this block... System.err.println(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target); fos.flush(); fos.close(); fos = null; } } if (fos != null) fos.close(); System.err.println("\n - moved corrupted file " + file.getPath() + " to /lost+found"); dfs.delete(new UTF8(file.getPath())); } catch (Exception e) { e.printStackTrace(); System.err.println(errmsg + ": " + e.getMessage()); } } /* * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is * bad. Both places should be refactored to provide a method to copy blocks * around. */ private void copyBlock(LocatedBlock lblock, FSOutputStream fos) throws Exception { int failures = 0; InetSocketAddress targetAddr = null; TreeSet deadNodes = new TreeSet(); Socket s = null; DataInputStream in = null; DataOutputStream out = null; while (s == null) { DatanodeInfo chosenNode; try { chosenNode = bestNode(lblock.getLocations(), deadNodes); targetAddr = DataNode.createSocketAddr(chosenNode.getName()); } catch (IOException ie) { if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) { throw new IOException("Could not obtain block " + lblock); } LOG.info("Could not obtain block from any node: " + ie); try { Thread.sleep(10000); } catch (InterruptedException iex) { } deadNodes.clear(); failures++; continue; } try { s = new Socket(); s.connect(targetAddr, FSConstants.READ_TIMEOUT); s.setSoTimeout(FSConstants.READ_TIMEOUT); // // Xmit header info to datanode // out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(FSConstants.OP_READSKIP_BLOCK); lblock.getBlock().write(out); out.writeLong(0L); out.flush(); // // Get bytes in block, set streams
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -