localfilesystem.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 400 行
JAVA
400 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.fs;import java.io.*;import java.util.*;import java.nio.channels.*;import org.apache.hadoop.fs.DF;import org.apache.hadoop.conf.Configuration;/**************************************************************** * Implement the FileSystem API for the native filesystem. * * @author Mike Cafarella *****************************************************************/public class LocalFileSystem extends FileSystem { private File workingDir = new File(System.getProperty("user.dir")); TreeMap sharedLockDataSet = new TreeMap(); TreeMap nonsharedLockDataSet = new TreeMap(); TreeMap lockObjSet = new TreeMap(); // by default use copy/delete instead of rename boolean useCopyForRename = true; /** Construct a local filesystem client. */ public LocalFileSystem(Configuration conf) throws IOException { super(conf); // if you find an OS which reliably supports non-POSIX // rename(2) across filesystems / volumes, you can // uncomment this. // String os = System.getProperty("os.name"); // if (os.toLowerCase().indexOf("os-with-super-rename") != -1) // useCopyForRename = false; } /** * Return 1x1 'localhost' cell if the file exists. * Return null if otherwise. */ public String[][] getFileCacheHints(File f, long start, long len) throws IOException { if (! f.exists()) { return null; } else { String result[][] = new String[1][]; result[0] = new String[1]; result[0][0] = "localhost"; return result; } } public String getName() { return "local"; } /******************************************************* * For open()'s FSInputStream *******************************************************/ class LocalFSFileInputStream extends FSInputStream { FileInputStream fis; public LocalFSFileInputStream(File f) throws IOException { this.fis = new FileInputStream(f); } public void seek(long pos) throws IOException { fis.getChannel().position(pos); } public long getPos() throws IOException { return fis.getChannel().position(); } /* * Just forward to the fis */ public int available() throws IOException { return fis.available(); } public void close() throws IOException { fis.close(); } public boolean markSupport() { return false; } public int read() throws IOException { try { return fis.read(); } catch (IOException e) { // unexpected exception throw new FSError(e); // assume native fs error } } public int read(byte[] b, int off, int len) throws IOException { try { return fis.read(b, off, len); } catch (IOException e) { // unexpected exception throw new FSError(e); // assume native fs error } } public long skip(long n) throws IOException { return fis.skip(n); } } public FSInputStream openRaw(File f) throws IOException { f = makeAbsolute(f); if (! f.exists()) { throw new FileNotFoundException(f.toString()); } return new LocalFSFileInputStream(f); } /********************************************************* * For create()'s FSOutputStream. *********************************************************/ class LocalFSFileOutputStream extends FSOutputStream { FileOutputStream fos; public LocalFSFileOutputStream(File f) throws IOException { this.fos = new FileOutputStream(f); } public long getPos() throws IOException { return fos.getChannel().position(); } /* * Just forward to the fos */ public void close() throws IOException { fos.close(); } public void flush() throws IOException { fos.flush(); } public void write(byte[] b, int off, int len) throws IOException { try { fos.write(b, off, len); } catch (IOException e) { // unexpected exception throw new FSError(e); // assume native fs error } } public void write(int b) throws IOException { try { fos.write(b); } catch (IOException e) { // unexpected exception throw new FSError(e); // assume native fs error } } } private File makeAbsolute(File f) { if (isAbsolute(f)) { return f; } else { return new File(workingDir, f.toString()); } } public FSOutputStream createRaw(File f, boolean overwrite) throws IOException { f = makeAbsolute(f); if (f.exists() && ! overwrite) { throw new IOException("File already exists:"+f); } File parent = f.getParentFile(); if (parent != null) parent.mkdirs(); return new LocalFSFileOutputStream(f); } public boolean renameRaw(File src, File dst) throws IOException { src = makeAbsolute(src); dst = makeAbsolute(dst); if (useCopyForRename) { FileUtil.copyContents(this, src, dst, true, getConf()); return fullyDelete(src); } else return src.renameTo(dst); } public boolean deleteRaw(File f) throws IOException { f = makeAbsolute(f); if (f.isFile()) { return f.delete(); } else return fullyDelete(f); } public boolean exists(File f) throws IOException { f = makeAbsolute(f); return f.exists(); } public boolean isDirectory(File f) throws IOException { f = makeAbsolute(f); return f.isDirectory(); } public boolean isAbsolute(File f) { return f.isAbsolute(); } public long getLength(File f) throws IOException { f = makeAbsolute(f); return f.length(); } public File[] listFilesRaw(File f) throws IOException { f = makeAbsolute(f); return f.listFiles(); } public void mkdirs(File f) throws IOException { f = makeAbsolute(f); f.mkdirs(); } /** * Set the working directory to the given directory. * Sets both a local variable and the system property. * Note that the system property is only used if the application explictly * calls java.io.File.getAbsolutePath(). */ public void setWorkingDirectory(File new_dir) { workingDir = makeAbsolute(new_dir); System.setProperty("user.dir", workingDir.toString()); } public File getWorkingDirectory() { return workingDir; } public synchronized void lock(File f, boolean shared) throws IOException { f = makeAbsolute(f); f.createNewFile(); FileLock lockObj = null; if (shared) { FileInputStream lockData = new FileInputStream(f); lockObj = lockData.getChannel().lock(0L, Long.MAX_VALUE, shared); sharedLockDataSet.put(f, lockData); } else { FileOutputStream lockData = new FileOutputStream(f); lockObj = lockData.getChannel().lock(0L, Long.MAX_VALUE, shared); nonsharedLockDataSet.put(f, lockData); } lockObjSet.put(f, lockObj); } public synchronized void release(File f) throws IOException { f = makeAbsolute(f); FileLock lockObj = (FileLock) lockObjSet.get(f); FileInputStream sharedLockData = (FileInputStream) sharedLockDataSet.get(f); FileOutputStream nonsharedLockData = (FileOutputStream) nonsharedLockDataSet.get(f); if (lockObj == null) { throw new IOException("Given target not held as lock"); } if (sharedLockData == null && nonsharedLockData == null) { throw new IOException("Given target not held as lock"); } lockObj.release(); lockObjSet.remove(f); if (sharedLockData != null) { sharedLockData.close(); sharedLockDataSet.remove(f); } else { nonsharedLockData.close(); nonsharedLockDataSet.remove(f); } } // In the case of the local filesystem, we can just rename the file. public void moveFromLocalFile(File src, File dst) throws IOException { if (! src.equals(dst)) { src = makeAbsolute(src); dst = makeAbsolute(dst); if (useCopyForRename) { FileUtil.copyContents(this, src, dst, true, getConf()); fullyDelete(src); } else src.renameTo(dst); } } // Similar to moveFromLocalFile(), except the source is kept intact. public void copyFromLocalFile(File src, File dst) throws IOException { if (! src.equals(dst)) { src = makeAbsolute(src); dst = makeAbsolute(dst); FileUtil.copyContents(this, src, dst, true, getConf()); } } // We can't delete the src file in this case. Too bad. public void copyToLocalFile(File src, File dst) throws IOException { if (! src.equals(dst)) { src = makeAbsolute(src); dst = makeAbsolute(dst); FileUtil.copyContents(this, src, dst, true, getConf()); } } // We can write output directly to the final location public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException { return makeAbsolute(fsOutputFile); } // It's in the right place - nothing to do. public void completeLocalOutput(File fsWorkingFile, File tmpLocalFile) throws IOException { } // We can read directly from the real local fs. public File startLocalInput(File fsInputFile, File tmpLocalFile) throws IOException { return makeAbsolute(fsInputFile); } // We're done reading. Nothing to clean up. public void completeLocalInput(File localFile) throws IOException { // Ignore the file, it's at the right destination! } public void close() throws IOException {} public String toString() { return "LocalFS"; } /** * Implement our own version instead of using the one in FileUtil, * to avoid infinite recursion. * @param dir * @return * @throws IOException */ private boolean fullyDelete(File dir) throws IOException { dir = makeAbsolute(dir); File contents[] = dir.listFiles(); if (contents != null) { for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { if (! contents[i].delete()) { return false; } } else { if (! fullyDelete(contents[i])) { return false; } } } } return dir.delete(); } /** Moves files to a bad file directory on the same device, so that their * storage will not be reused. */ public void reportChecksumFailure(File f, FSInputStream in, long start, long length, int crc) { try { // canonicalize f f = makeAbsolute(f).getCanonicalFile(); // find highest writable parent dir of f on the same device String device = new DF(f.toString(), getConf()).getMount(); File parent = f.getParentFile(); File dir; do { dir = parent; parent = parent.getParentFile(); } while (parent.canWrite() && parent.toString().startsWith(device)); // move the file there File badDir = new File(dir, "bad_files"); badDir.mkdirs(); String suffix = "." + new Random().nextInt(); File badFile = new File(badDir,f.getName()+suffix); LOG.warning("Moving bad file " + f + " to " + badFile); in.close(); // close it first f.renameTo(badFile); // rename it // move checksum file too File checkFile = getChecksumFile(f); checkFile.renameTo(new File(badDir, checkFile.getName()+suffix)); } catch (IOException e) { LOG.warning("Error moving bad file " + f + ": " + e); } } public long getBlockSize() { // default to 32MB: large enough to minimize the impact of seeks return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?