distributedfilesystem.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 357 行

JAVA
357
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import java.io.*;import java.net.*;import java.util.*;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.Configuration;/**************************************************************** * Implementation of the abstract FileSystem for the DFS system. * This object is the way end-user code interacts with a Hadoop * DistributedFileSystem. * * @author Mike Cafarella *****************************************************************/public class DistributedFileSystem extends FileSystem {    private File workingDir =       new File("/user", System.getProperty("user.name")).getAbsoluteFile();    private String name;    DFSClient dfs;    /** Construct a client for the filesystem at <code>namenode</code>.     */    public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException {      super(conf);      this.dfs = new DFSClient(namenode, conf);      this.name = namenode.getHostName() + ":" + namenode.getPort();    }    public String getName() { return name; }    public File getWorkingDirectory() {      return workingDir;    }        private File makeAbsolute(File f) {      if (isAbsolute(f)) {        return f;      } else {        return new File(workingDir, f.getPath());      }    }        public void setWorkingDirectory(File dir) {      workingDir = makeAbsolute(dir);    }        private UTF8 getPath(File file) {      String path = getDFSPath(makeAbsolute(file));      return new UTF8(path);    }    public String[][] getFileCacheHints(File f, long start, long len) throws IOException {      return dfs.getHints(getPath(f), start, len);    }    public FSInputStream openRaw(File f) throws IOException {      return dfs.open(getPath(f));    }    public FSOutputStream createRaw(File f, boolean overwrite)      throws IOException {      return dfs.create(getPath(f), overwrite);    }    /**     * Rename files/dirs     */    public boolean renameRaw(File src, File dst) throws IOException {      return dfs.rename(getPath(src), getPath(dst));    }    /**     * Get rid of File f, whether a true file or dir.     */    public boolean deleteRaw(File f) throws IOException {        return dfs.delete(getPath(f));    }    public boolean exists(File f) throws IOException {        return dfs.exists(getPath(f));    }    public boolean isDirectory(File f) throws IOException {        if (f instanceof DFSFile) {          return ((DFSFile)f).isDirectory();        }        return dfs.isDirectory(getPath(f));    }    public boolean isAbsolute(File f) {      return f.isAbsolute() ||        f.getPath().startsWith("/") ||        f.getPath().startsWith("\\");    }    public long getLength(File f) throws IOException {        if (f instanceof DFSFile) {          return ((DFSFile)f).length();        }        DFSFileInfo info[] = dfs.listFiles(getPath(f));        return info[0].getLen();    }    public File[] listFilesRaw(File f) throws IOException {        DFSFileInfo info[] = dfs.listFiles(getPath(f));        if (info == null) {            return new File[0];        } else {            File results[] = new DFSFile[info.length];            for (int i = 0; i < info.length; i++) {                results[i] = new DFSFile(info[i]);            }            return results;        }    }    public void mkdirs(File f) throws IOException {        dfs.mkdirs(getPath(f));    }    public void lock(File f, boolean shared) throws IOException {        dfs.lock(getPath(f), ! shared);    }    public void release(File f) throws IOException {        dfs.release(getPath(f));    }    public void moveFromLocalFile(File src, File dst) throws IOException {        doFromLocalFile(src, dst, true);    }    public void copyFromLocalFile(File src, File dst) throws IOException {        doFromLocalFile(src, dst, false);    }    private void doFromLocalFile(File src, File dst, boolean deleteSource) throws IOException {        if (exists(dst)) {            if (! isDirectory(dst)) {                throw new IOException("Target " + dst + " already exists");            } else {                dst = new File(dst, src.getName());                if (exists(dst)) {                    throw new IOException("Target " + dst + " already exists");                }            }        }        FileSystem localFs = getNamed("local", getConf());        if (localFs.isDirectory(src)) {            mkdirs(dst);            File contents[] = localFs.listFiles(src);            for (int i = 0; i < contents.length; i++) {                doFromLocalFile(contents[i], new File(dst, contents[i].getName()), deleteSource);            }        } else {            byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];            InputStream in = localFs.open(src);            try {                OutputStream out = create(dst);                try {                    int bytesRead = in.read(buf);                    while (bytesRead >= 0) {                        out.write(buf, 0, bytesRead);                        bytesRead = in.read(buf);                    }                } finally {                    out.close();                }            } finally {                in.close();            }         }        if (deleteSource)            localFs.delete(src);    }    public void copyToLocalFile(File src, File dst) throws IOException {        if (dst.exists()) {            if (! dst.isDirectory()) {                throw new IOException("Target " + dst + " already exists");            } else {                dst = new File(dst, src.getName());                if (dst.exists()) {                    throw new IOException("Target " + dst + " already exists");                }            }        }        dst = dst.getCanonicalFile();        FileSystem localFs = getNamed("local", getConf());        if (isDirectory(src)) {            localFs.mkdirs(dst);            File contents[] = listFiles(src);            for (int i = 0; i < contents.length; i++) {                copyToLocalFile(contents[i], new File(dst, contents[i].getName()));            }        } else {            byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];            InputStream in = open(src);            try {                OutputStream out = localFs.create(dst);                try {                    int bytesRead = in.read(buf);                    while (bytesRead >= 0) {                        out.write(buf, 0, bytesRead);                        bytesRead = in.read(buf);                    }                } finally {                    out.close();                }            } finally {                in.close();            }         }    }    public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException {        if (exists(fsOutputFile)) {            copyToLocalFile(fsOutputFile, tmpLocalFile);        }        return tmpLocalFile;    }    /**     * Move completed local data to DFS destination     */    public void completeLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException {        moveFromLocalFile(tmpLocalFile, fsOutputFile);    }    /**     * Fetch remote DFS file, place at tmpLocalFile     */    public File startLocalInput(File fsInputFile, File tmpLocalFile) throws IOException {        copyToLocalFile(fsInputFile, tmpLocalFile);        return tmpLocalFile;    }    /**     * We're done with the local stuff, so delete it     */    public void completeLocalInput(File localFile) throws IOException {        // Get rid of the local copy - we don't need it anymore.        FileUtil.fullyDelete(localFile, getConf());    }    public void close() throws IOException {        dfs.close();    }    public String toString() {        return "DFS[" + dfs + "]";    }    DFSClient getClient() {        return dfs;    }        private String getDFSPath(File f) {      List l = new ArrayList();      l.add(f.getName());      File parent = f.getParentFile();      while (parent != null) {        l.add(parent.getName());        parent = parent.getParentFile();      }      StringBuffer path = new StringBuffer();      path.append(l.get(l.size() - 1));      for (int i = l.size() - 2; i >= 0; i--) {        path.append(DFSFile.DFS_FILE_SEPARATOR);        path.append(l.get(i));      }      if (isAbsolute(f) && path.length() == 0) {        path.append(DFSFile.DFS_FILE_SEPARATOR);      }      return path.toString();    }    public void reportChecksumFailure(File f, FSInputStream in,                                      long start, long length, int crc) {            // ignore for now, causing task to fail, and hope that when task is      // retried it gets a different copy of the block that is not corrupt.      // FIXME: we should move the bad block(s) involved to a bad block      // directory on their datanode, and then re-replicate the blocks, so that      // no data is lost. a task may fail, but on retry it should succeed.    }    public long getBlockSize() {      return dfs.BLOCK_SIZE;    }    /** Return the total raw capacity of the filesystem, disregarding     * replication .*/    public long getRawCapacity() throws IOException{        return dfs.totalRawCapacity();    }    /** Return the total raw used space in the filesystem, disregarding     * replication .*/    public long getRawUsed() throws IOException{        return dfs.totalRawUsed();    }    /** Return the total size of all files in the filesystem.*/    public long getUsed()throws IOException{        long used = 0;        DFSFileInfo dfsFiles[] = dfs.listFiles(getPath(new File("/")));        for(int i=0;i<dfsFiles.length;i++){            used += dfsFiles[i].getContentsLen();        }        return used;    }    /** Return statistics for each datanode.*/    public DataNodeReport[] getDataNodeStats() throws IOException {      DatanodeInfo[]  dnReport = dfs.datanodeReport();      DataNodeReport[] reports = new DataNodeReport[dnReport.length];      for (int i = 0; i < dnReport.length; i++) {        reports[i] = new DataNodeReport();        reports[i].name = dnReport[i].getName().toString();        reports[i].host = dnReport[i].getHost().toString();        reports[i].capacity = dnReport[i].getCapacity();        reports[i].remaining = dnReport[i].getRemaining();        reports[i].lastUpdate = dnReport[i].lastUpdate();      }      return reports;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?