⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。在 Hadoop中实现了Google的MapReduce算法
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.io;import java.io.*;import java.util.*;import java.util.zip.*;import java.util.logging.*;import java.net.InetAddress;import java.rmi.server.UID;import java.security.MessageDigest;import org.apache.lucene.util.PriorityQueue;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;/** Support for flat files of binary key/value pairs. */public class SequenceFile {  public static final Logger LOG =    LogFormatter.getLogger("org.apache.hadoop.io.SequenceFile");  private SequenceFile() {}                         // no public ctor  private static byte[] VERSION = new byte[] {    (byte)'S', (byte)'E', (byte)'Q', 3  };  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash   private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash  /** The number of bytes between sync points.*/  public static final int SYNC_INTERVAL = 100*SYNC_SIZE;   /** Write key/value pairs to a sequence-format file. */  public static class Writer {    private FSDataOutputStream out;    private DataOutputBuffer buffer = new DataOutputBuffer();    private FileSystem fs = null;    private File target = null;    private Class keyClass;    private Class valClass;    private boolean deflateValues;    private Deflater deflater = new Deflater(Deflater.BEST_SPEED);    private DeflaterOutputStream deflateFilter =      new DeflaterOutputStream(buffer, deflater);    private DataOutputStream deflateOut =      new DataOutputStream(new BufferedOutputStream(deflateFilter));    // Insert a globally unique 16-byte value every few entries, so that one    // can seek into the middle of a file and then synchronize with record    // starts and ends by scanning for this value.    private long lastSyncPos;                     // position of last sync    private byte[] sync;                          // 16 random bytes    {      try {                                       // use hash of uid + host        MessageDigest digester = MessageDigest.getInstance("MD5");        digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());        sync = digester.digest();      } catch (Exception e) {        throw new RuntimeException(e);      }    }    /** Create the named file. */    public Writer(FileSystem fs, String name,                  Class keyClass, Class valClass)      throws IOException {      this(fs, name, keyClass, valClass, false);    }        /** Create the named file.     * @param compress if true, values are compressed.     */    public Writer(FileSystem fs, String name,                  Class keyClass, Class valClass, boolean compress)      throws IOException {      this.fs = fs;      this.target = new File(name);      init(fs.create(target), keyClass, valClass, compress);    }        /** Write to an arbitrary stream using a specified buffer size. */    private Writer(FSDataOutputStream out,                   Class keyClass, Class valClass, boolean compress)      throws IOException {      init(out, keyClass, valClass, compress);    }        /** Write and flush the file header. */    private void init(FSDataOutputStream out,                      Class keyClass, Class valClass,                      boolean compress) throws IOException {      this.out = out;      this.out.write(VERSION);      this.keyClass = keyClass;      this.valClass = valClass;      this.deflateValues = compress;      new UTF8(WritableName.getName(keyClass)).write(this.out);      new UTF8(WritableName.getName(valClass)).write(this.out);      this.out.writeBoolean(deflateValues);      out.write(sync);                            // write the sync bytes      this.out.flush();                           // flush header    }        /** Returns the class of keys in this file. */    public Class getKeyClass() { return keyClass; }    /** Returns the class of values in this file. */    public Class getValueClass() { return valClass; }    /** Close the file. */    public synchronized void close() throws IOException {      if (out != null) {        out.close();        out = null;      }    }    /** Append a key/value pair. */    public synchronized void append(Writable key, Writable val)      throws IOException {      if (key.getClass() != keyClass)        throw new IOException("wrong key class: "+key+" is not "+keyClass);      if (val.getClass() != valClass)        throw new IOException("wrong value class: "+val+" is not "+valClass);      buffer.reset();      key.write(buffer);      int keyLength = buffer.getLength();      if (keyLength == 0)        throw new IOException("zero length keys not allowed: " + key);      if (deflateValues) {        deflater.reset();        val.write(deflateOut);        deflateOut.flush();        deflateFilter.finish();      } else {        val.write(buffer);      }      append(buffer.getData(), 0, buffer.getLength(), keyLength);    }    /** Append a key/value pair. */    public synchronized void append(byte[] data, int start, int length,                                    int keyLength) throws IOException {      if (keyLength == 0)        throw new IOException("zero length keys not allowed");      if (sync != null &&          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync        lastSyncPos = out.getPos();               // update lastSyncPos        //LOG.info("sync@"+lastSyncPos);        out.writeInt(SYNC_ESCAPE);                // escape it        out.write(sync);                          // write sync      }      out.writeInt(length);                       // total record length      out.writeInt(keyLength);                    // key portion length      out.write(data, start, length);             // data    }    /** Returns the current length of the output file. */    public synchronized long getLength() throws IOException {      return out.getPos();    }  }  /** Writes key/value pairs from a sequence-format file. */  public static class Reader {    private String file;    private FSDataInputStream in;    private DataOutputBuffer outBuf = new DataOutputBuffer();    private DataInputBuffer inBuf = new DataInputBuffer();    private FileSystem fs = null;    private byte[] version = new byte[VERSION.length];    private Class keyClass;    private Class valClass;    private byte[] sync = new byte[SYNC_HASH_SIZE];    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];    private boolean syncSeen;    private long end;    private int keyLength;    private boolean inflateValues;    private byte[] inflateIn = new byte[8192];    private DataOutputBuffer inflateOut = new DataOutputBuffer();    private Inflater inflater = new Inflater();    private Configuration conf;    /** Open the named file. */    public Reader(FileSystem fs, String file, Configuration conf) throws IOException {      this(fs, file, conf.getInt("io.file.buffer.size", 4096));      this.conf = conf;    }    private Reader(FileSystem fs, String name, int bufferSize) throws IOException {      this.fs = fs;      this.file = name;      File file = new File(name);      this.in = fs.open(file, bufferSize);      this.end = fs.getLength(file);      init();    }        private Reader(FileSystem fs, String file, int bufferSize, long start, long length)      throws IOException {      this.fs = fs;      this.file = file;      this.in = fs.open(new File(file), bufferSize);      seek(start);      init();      this.end = in.getPos() + length;    }        private void init() throws IOException {      in.readFully(version);      if ((version[0] != VERSION[0]) ||          (version[1] != VERSION[1]) ||          (version[2] != VERSION[2]))        throw new IOException(file + " not a SequenceFile");      if (version[3] > VERSION[3])        throw new VersionMismatchException(VERSION[3], version[3]);      UTF8 className = new UTF8();            className.readFields(in);                   // read key class name      this.keyClass = WritableName.getClass(className.toString());            className.readFields(in);                   // read val class name      this.valClass = WritableName.getClass(className.toString());      if (version[3] > 2) {                       // if version > 2        this.inflateValues = in.readBoolean();    // is compressed?      }      if (version[3] > 1) {                       // if version > 1        in.readFully(sync);                       // read sync bytes      }    }        /** Close the file. */    public synchronized void close() throws IOException {      in.close();    }    /** Returns the class of keys in this file. */    public Class getKeyClass() { return keyClass; }    /** Returns the class of values in this file. */    public Class getValueClass() { return valClass; }    /** Returns true if values are compressed. */    public boolean isCompressed() { return inflateValues; }    /** Read the next key in the file into <code>key</code>, skipping its     * value.  True if another entry exists, and false at end of file. */    public synchronized boolean next(Writable key) throws IOException {      if (key.getClass() != keyClass)        throw new IOException("wrong key class: "+key+" is not "+keyClass);      outBuf.reset();      keyLength = next(outBuf);      if (keyLength < 0)        return false;      inBuf.reset(outBuf.getData(), outBuf.getLength());      key.readFields(inBuf);      if (inBuf.getPosition() != keyLength)        throw new IOException(key + " read " + inBuf.getPosition()                              + " bytes, should read " + keyLength);      return true;    }    /** Read the next key/value pair in the file into <code>key</code> and     * <code>val</code>.  Returns true if such a pair exists and false when at     * end of file */    public synchronized boolean next(Writable key, Writable val)      throws IOException {      if (val.getClass() != valClass)        throw new IOException("wrong value class: "+val+" is not "+valClass);      boolean more = next(key);      if (more) {        if (inflateValues) {          inflater.reset();          inflater.setInput(outBuf.getData(), keyLength,                            outBuf.getLength()-keyLength);          inflateOut.reset();          while (!inflater.finished()) {            try {              int count = inflater.inflate(inflateIn);              inflateOut.write(inflateIn, 0, count);            } catch (DataFormatException e) {              throw new IOException (e.toString());            }          }          inBuf.reset(inflateOut.getData(), inflateOut.getLength());        }        if(val instanceof Configurable) {          ((Configurable) val).setConf(this.conf);        }        val.readFields(inBuf);        if (inBuf.getPosition() != inBuf.getLength())          throw new IOException(val+" read "+(inBuf.getPosition()-keyLength)                                + " bytes, should read " +                                (inBuf.getLength()-keyLength));      }      return more;    }    /** Read the next key/value pair in the file into <code>buffer</code>.     * Returns the length of the key read, or -1 if at end of file.  The length     * of the value may be computed by calling buffer.getLength() before and     * after calls to this method. */    public synchronized int next(DataOutputBuffer buffer) throws IOException {      if (in.getPos() >= end)        return -1;      try {        int length = in.readInt();        if (version[3] > 1 && sync != null &&            length == SYNC_ESCAPE) {              // process a sync entry          //LOG.info("sync@"+in.getPos());          in.readFully(syncCheck);                // read syncCheck          if (!Arrays.equals(sync, syncCheck))    // check it            throw new IOException("File is corrupt!");          syncSeen = true;          length = in.readInt();                  // re-read length        } else {          syncSeen = false;        }                int keyLength = in.readInt();        buffer.write(in, length);        return keyLength;      } catch (ChecksumException e) {             // checksum failure        handleChecksumException(e);        return next(buffer);      }    }    private void handleChecksumException(ChecksumException e)      throws IOException {      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {        LOG.warning("Bad checksum at "+getPosition()+". Skipping entries.");        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));      } else {        throw e;      }    }    /** Set the current byte position in the input file. */    public synchronized void seek(long position) throws IOException {      in.seek(position);    }    /** Seek to the next sync mark past a given position.*/    public synchronized void sync(long position) throws IOException {      if (position+SYNC_SIZE >= end) {        seek(end);        return;      }      try {        seek(position+4);                         // skip escape        in.readFully(syncCheck);        int syncLen = sync.length;        for (int i = 0; in.getPos() < end; i++) {          int j = 0;          for (; j < syncLen; j++) {            if (sync[j] != syncCheck[(i+j)%syncLen])              break;          }          if (j == syncLen) {            in.seek(in.getPos() - SYNC_SIZE);     // position before sync            return;          }          syncCheck[i%syncLen] = in.readByte();        }      } catch (ChecksumException e) {             // checksum failure        handleChecksumException(e);      }    }    /** Returns true iff the previous call to next passed a sync mark.*/    public boolean syncSeen() { return syncSeen; }    /** Return the current byte position in the input file. */    public synchronized long getPosition() throws IOException {      return in.getPos();    }    /** Returns the name of the file. */    public String toString() {      return file;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -