fsdatainputstream.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 251 行
JAVA
251 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.fs;import java.io.*;import java.util.Arrays;import java.util.logging.*;import java.util.zip.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} * and buffers input through a {@link BufferedInputStream}. */public class FSDataInputStream extends DataInputStream { private static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.DataInputStream"); private static final byte[] VERSION = FSDataOutputStream.CHECKSUM_VERSION; private static final int HEADER_LENGTH = 8; private int bytesPerSum = 1; /** Verify that data matches checksums. */ private class Checker extends FilterInputStream implements Seekable { private FileSystem fs; private File file; private FSDataInputStream sums; private Checksum sum = new CRC32(); private int inSum; public Checker(FileSystem fs, File file, Configuration conf) throws IOException { super(fs.openRaw(file)); this.fs = fs; this.file = file; File sumFile = fs.getChecksumFile(file); try { this.sums = new FSDataInputStream(fs.openRaw(sumFile), conf); byte[] version = new byte[VERSION.length]; sums.readFully(version); if (!Arrays.equals(version, VERSION)) throw new IOException("Not a checksum file: "+sumFile); bytesPerSum = sums.readInt(); } catch (FileNotFoundException e) { // quietly ignore stopSumming(); } catch (IOException e) { // loudly ignore LOG.warning("Problem opening checksum file: "+ file + ". Ignoring with exception " + e + "."); stopSumming(); } } public void seek(long desired) throws IOException { ((Seekable)in).seek(desired); if (sums != null) { if (desired % bytesPerSum != 0) throw new IOException("Seek to non-checksummed position."); try { sums.seek(HEADER_LENGTH + 4*(desired/bytesPerSum)); } catch (IOException e) { LOG.warning("Problem seeking checksum file: "+e+". Ignoring."); stopSumming(); } sum.reset(); inSum = 0; } } public int read(byte b[], int off, int len) throws IOException { int read = in.read(b, off, len); if (sums != null) { int summed = 0; while (summed < read) { int goal = bytesPerSum - inSum; int inBuf = read - summed; int toSum = inBuf <= goal ? inBuf : goal; sum.update(b, off+summed, toSum); summed += toSum; inSum += toSum; if (inSum == bytesPerSum) { verifySum(read-(summed-bytesPerSum)); } } } return read; } private void verifySum(int delta) throws IOException { int crc; try { crc = sums.readInt(); } catch (IOException e) { LOG.warning("Problem reading checksum file: "+e+". Ignoring."); stopSumming(); return; } int sumValue = (int)sum.getValue(); sum.reset(); inSum = 0; if (crc != sumValue) { long pos = getPos() - delta; fs.reportChecksumFailure(file, (FSInputStream)in, pos, bytesPerSum, crc); throw new ChecksumException("Checksum error: "+file+" at "+pos); } } public long getPos() throws IOException { return ((FSInputStream)in).getPos(); } public void close() throws IOException { super.close(); stopSumming(); } private void stopSumming() { if (sums != null) { try { sums.close(); } catch (IOException f) {} sums = null; bytesPerSum = 1; } } } /** Cache the file position. This improves performance significantly.*/ private static class PositionCache extends FilterInputStream { long position; public PositionCache(InputStream in) throws IOException { super(in); } // This is the only read() method called by BufferedInputStream, so we trap // calls to it in order to cache the position. public int read(byte b[], int off, int len) throws IOException { int result; if( (result = in.read(b, off, len)) > 0 ) position += result; return result; } public void seek(long desired) throws IOException { ((Seekable)in).seek(desired); // seek underlying stream position = desired; // update position } public long getPos() throws IOException { return position; // return cached position } } /** Buffer input. This improves performance significantly.*/ private class Buffer extends BufferedInputStream { public Buffer(PositionCache in, int bufferSize) throws IOException { super(in, bufferSize); } public void seek(long desired) throws IOException { long end = ((PositionCache)in).getPos(); long start = end - this.count; if (desired >= start && desired < end) { this.pos = (int)(desired - start); // can position within buffer } else { this.count = 0; // invalidate buffer this.pos = 0; long delta = desired % bytesPerSum; // seek to last checksummed point, if any ((PositionCache)in).seek(desired - delta); // scan to desired position for (int i = 0; i < delta; i++) { read(); } } } public long getPos() throws IOException { // adjust for buffer return ((PositionCache)in).getPos() - (this.count - this.pos); } // optimized version of read() public int read() throws IOException { if (pos >= count) return super.read(); return buf[pos++] & 0xff; }} public FSDataInputStream(FileSystem fs, File file, int bufferSize, Configuration conf) throws IOException { super(null); this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize); } public FSDataInputStream(FileSystem fs, File file, Configuration conf) throws IOException { super(null); int bufferSize = conf.getInt("io.file.buffer.size", 4096); this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize); } /** Construct without checksums. */ public FSDataInputStream(FSInputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", 4096)); } /** Construct without checksums. */ public FSDataInputStream(FSInputStream in, int bufferSize) throws IOException { super(null); this.in = new Buffer(new PositionCache(in), bufferSize); } public void seek(long desired) throws IOException { ((Buffer)in).seek(desired); } public long getPos() throws IOException { return ((Buffer)in).getPos(); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?