📄 sequencefile.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.io;import java.io.*;import java.util.*;import java.util.zip.*;import java.util.logging.*;import java.net.InetAddress;import java.rmi.server.UID;import java.security.MessageDigest;import org.apache.lucene.util.PriorityQueue;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;/** Support for flat files of binary key/value pairs. */public class SequenceFile { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.io.SequenceFile"); private SequenceFile() {} // no public ctor private static byte[] VERSION = new byte[] { (byte)'S', (byte)'E', (byte)'Q', 3 }; private static final int SYNC_ESCAPE = -1; // "length" of sync entries private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash /** The number of bytes between sync points.*/ public static final int SYNC_INTERVAL = 100*SYNC_SIZE; /** Write key/value pairs to a sequence-format file. */ public static class Writer { private FSDataOutputStream out; private DataOutputBuffer buffer = new DataOutputBuffer(); private FileSystem fs = null; private File target = null; private Class keyClass; private Class valClass; private boolean deflateValues; private Deflater deflater = new Deflater(Deflater.BEST_SPEED); private DeflaterOutputStream deflateFilter = new DeflaterOutputStream(buffer, deflater); private DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. private long lastSyncPos; // position of last sync private byte[] sync; // 16 random bytes { try { // use hash of uid + host MessageDigest digester = MessageDigest.getInstance("MD5"); digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes()); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); } } /** Create the named file. */ public Writer(FileSystem fs, String name, Class keyClass, Class valClass) throws IOException { this(fs, name, keyClass, valClass, false); } /** Create the named file. * @param compress if true, values are compressed. */ public Writer(FileSystem fs, String name, Class keyClass, Class valClass, boolean compress) throws IOException { this.fs = fs; this.target = new File(name); init(fs.create(target), keyClass, valClass, compress); } /** Write to an arbitrary stream using a specified buffer size. */ private Writer(FSDataOutputStream out, Class keyClass, Class valClass, boolean compress) throws IOException { init(out, keyClass, valClass, compress); } /** Write and flush the file header. */ private void init(FSDataOutputStream out, Class keyClass, Class valClass, boolean compress) throws IOException { this.out = out; this.out.write(VERSION); this.keyClass = keyClass; this.valClass = valClass; this.deflateValues = compress; new UTF8(WritableName.getName(keyClass)).write(this.out); new UTF8(WritableName.getName(valClass)).write(this.out); this.out.writeBoolean(deflateValues); out.write(sync); // write the sync bytes this.out.flush(); // flush header } /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } /** Returns the class of values in this file. */ public Class getValueClass() { return valClass; } /** Close the file. */ public synchronized void close() throws IOException { if (out != null) { out.close(); out = null; } } /** Append a key/value pair. */ public synchronized void append(Writable key, Writable val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key+" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val+" is not "+valClass); buffer.reset(); key.write(buffer); int keyLength = buffer.getLength(); if (keyLength == 0) throw new IOException("zero length keys not allowed: " + key); if (deflateValues) { deflater.reset(); val.write(deflateOut); deflateOut.flush(); deflateFilter.finish(); } else { val.write(buffer); } append(buffer.getData(), 0, buffer.getLength(), keyLength); } /** Append a key/value pair. */ public synchronized void append(byte[] data, int start, int length, int keyLength) throws IOException { if (keyLength == 0) throw new IOException("zero length keys not allowed"); if (sync != null && out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync lastSyncPos = out.getPos(); // update lastSyncPos //LOG.info("sync@"+lastSyncPos); out.writeInt(SYNC_ESCAPE); // escape it out.write(sync); // write sync } out.writeInt(length); // total record length out.writeInt(keyLength); // key portion length out.write(data, start, length); // data } /** Returns the current length of the output file. */ public synchronized long getLength() throws IOException { return out.getPos(); } } /** Writes key/value pairs from a sequence-format file. */ public static class Reader { private String file; private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); private DataInputBuffer inBuf = new DataInputBuffer(); private FileSystem fs = null; private byte[] version = new byte[VERSION.length]; private Class keyClass; private Class valClass; private byte[] sync = new byte[SYNC_HASH_SIZE]; private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; private boolean syncSeen; private long end; private int keyLength; private boolean inflateValues; private byte[] inflateIn = new byte[8192]; private DataOutputBuffer inflateOut = new DataOutputBuffer(); private Inflater inflater = new Inflater(); private Configuration conf; /** Open the named file. */ public Reader(FileSystem fs, String file, Configuration conf) throws IOException { this(fs, file, conf.getInt("io.file.buffer.size", 4096)); this.conf = conf; } private Reader(FileSystem fs, String name, int bufferSize) throws IOException { this.fs = fs; this.file = name; File file = new File(name); this.in = fs.open(file, bufferSize); this.end = fs.getLength(file); init(); } private Reader(FileSystem fs, String file, int bufferSize, long start, long length) throws IOException { this.fs = fs; this.file = file; this.in = fs.open(new File(file), bufferSize); seek(start); init(); this.end = in.getPos() + length; } private void init() throws IOException { in.readFully(version); if ((version[0] != VERSION[0]) || (version[1] != VERSION[1]) || (version[2] != VERSION[2])) throw new IOException(file + " not a SequenceFile"); if (version[3] > VERSION[3]) throw new VersionMismatchException(VERSION[3], version[3]); UTF8 className = new UTF8(); className.readFields(in); // read key class name this.keyClass = WritableName.getClass(className.toString()); className.readFields(in); // read val class name this.valClass = WritableName.getClass(className.toString()); if (version[3] > 2) { // if version > 2 this.inflateValues = in.readBoolean(); // is compressed? } if (version[3] > 1) { // if version > 1 in.readFully(sync); // read sync bytes } } /** Close the file. */ public synchronized void close() throws IOException { in.close(); } /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } /** Returns the class of values in this file. */ public Class getValueClass() { return valClass; } /** Returns true if values are compressed. */ public boolean isCompressed() { return inflateValues; } /** Read the next key in the file into <code>key</code>, skipping its * value. True if another entry exists, and false at end of file. */ public synchronized boolean next(Writable key) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key+" is not "+keyClass); outBuf.reset(); keyLength = next(outBuf); if (keyLength < 0) return false; inBuf.reset(outBuf.getData(), outBuf.getLength()); key.readFields(inBuf); if (inBuf.getPosition() != keyLength) throw new IOException(key + " read " + inBuf.getPosition() + " bytes, should read " + keyLength); return true; } /** Read the next key/value pair in the file into <code>key</code> and * <code>val</code>. Returns true if such a pair exists and false when at * end of file */ public synchronized boolean next(Writable key, Writable val) throws IOException { if (val.getClass() != valClass) throw new IOException("wrong value class: "+val+" is not "+valClass); boolean more = next(key); if (more) { if (inflateValues) { inflater.reset(); inflater.setInput(outBuf.getData(), keyLength, outBuf.getLength()-keyLength); inflateOut.reset(); while (!inflater.finished()) { try { int count = inflater.inflate(inflateIn); inflateOut.write(inflateIn, 0, count); } catch (DataFormatException e) { throw new IOException (e.toString()); } } inBuf.reset(inflateOut.getData(), inflateOut.getLength()); } if(val instanceof Configurable) { ((Configurable) val).setConf(this.conf); } val.readFields(inBuf); if (inBuf.getPosition() != inBuf.getLength()) throw new IOException(val+" read "+(inBuf.getPosition()-keyLength) + " bytes, should read " + (inBuf.getLength()-keyLength)); } return more; } /** Read the next key/value pair in the file into <code>buffer</code>. * Returns the length of the key read, or -1 if at end of file. The length * of the value may be computed by calling buffer.getLength() before and * after calls to this method. */ public synchronized int next(DataOutputBuffer buffer) throws IOException { if (in.getPos() >= end) return -1; try { int length = in.readInt(); if (version[3] > 1 && sync != null && length == SYNC_ESCAPE) { // process a sync entry //LOG.info("sync@"+in.getPos()); in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); syncSeen = true; length = in.readInt(); // re-read length } else { syncSeen = false; } int keyLength = in.readInt(); buffer.write(in, length); return keyLength; } catch (ChecksumException e) { // checksum failure handleChecksumException(e); return next(buffer); } } private void handleChecksumException(ChecksumException e) throws IOException { if (this.conf.getBoolean("io.skip.checksum.errors", false)) { LOG.warning("Bad checksum at "+getPosition()+". Skipping entries."); sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); } else { throw e; } } /** Set the current byte position in the input file. */ public synchronized void seek(long position) throws IOException { in.seek(position); } /** Seek to the next sync mark past a given position.*/ public synchronized void sync(long position) throws IOException { if (position+SYNC_SIZE >= end) { seek(end); return; } try { seek(position+4); // skip escape in.readFully(syncCheck); int syncLen = sync.length; for (int i = 0; in.getPos() < end; i++) { int j = 0; for (; j < syncLen; j++) { if (sync[j] != syncCheck[(i+j)%syncLen]) break; } if (j == syncLen) { in.seek(in.getPos() - SYNC_SIZE); // position before sync return; } syncCheck[i%syncLen] = in.readByte(); } } catch (ChecksumException e) { // checksum failure handleChecksumException(e); } } /** Returns true iff the previous call to next passed a sync mark.*/ public boolean syncSeen() { return syncSeen; } /** Return the current byte position in the input file. */ public synchronized long getPosition() throws IOException { return in.getPos(); } /** Returns the name of the file. */ public String toString() { return file; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -