⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fseditlog.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import java.io.BufferedInputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.io.Writable;/** * FSEditLog maintains a log of the namespace modifications. *  * @author Konstantin Shvachko */class FSEditLog {  private static final byte OP_ADD = 0;  private static final byte OP_RENAME = 1;  private static final byte OP_DELETE = 2;  private static final byte OP_MKDIR = 3;  private static final byte OP_SET_REPLICATION = 4;  private static final byte OP_DATANODE_ADD = 5;  private static final byte OP_DATANODE_REMOVE = 6;    private File editsFile;  DataOutputStream editsStream = null;    FSEditLog( File edits ) {    this.editsFile = edits;  }    File getEditsFile() {    return this.editsFile;  }  /**   * Initialize the output stream for logging.   *    * @throws IOException   */  void create() throws IOException {    editsStream = new DataOutputStream(new FileOutputStream(editsFile));    editsStream.writeInt( FSConstants.DFS_CURRENT_VERSION );  }    /**   * Shutdown the filestore   */  void close() throws IOException {    editsStream.close();  }  /**   * Load an edit log, and apply the changes to the in-memory structure   *   * This is where we apply edits that we've been writing to disk all   * along.   */  int loadFSEdits( Configuration conf ) throws IOException {    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();    FSDirectory fsDir = fsNamesys.dir;    int numEdits = 0;    int logVersion = 0;        if (editsFile.exists()) {      DataInputStream in = new DataInputStream(          new BufferedInputStream(              new FileInputStream(editsFile)));      // Read log file version. Could be missing.       in.mark( 4 );      if( in.available() > 0 ) {        logVersion = in.readByte();        in.reset();        if( logVersion >= 0 )          logVersion = 0;        else          logVersion = in.readInt();        if( logVersion < FSConstants.DFS_CURRENT_VERSION ) // future version          throw new IOException(              "Unexpected version of the file system log file: "              + logVersion              + ". Current version = "               + FSConstants.DFS_CURRENT_VERSION + "." );      }            short replication = (short)conf.getInt("dfs.replication", 3);      try {        while (in.available() > 0) {          byte opcode = in.readByte();          numEdits++;          switch (opcode) {          case OP_ADD: {            UTF8 name = new UTF8();            ArrayWritable aw = null;            Writable writables[];            // version 0 does not support per file replication            if( logVersion >= 0 )              name.readFields(in);  // read name only            else {  // other versions do              // get name and replication              aw = new ArrayWritable(UTF8.class);              aw.readFields(in);              writables = aw.get();               if( writables.length != 2 )                throw new IOException("Incorrect data fortmat. "                     + "Name & replication pair expected");              name = (UTF8) writables[0];              replication = Short.parseShort(                  ((UTF8)writables[1]).toString());              replication = adjustReplication( replication, conf );            }            // get blocks            aw = new ArrayWritable(Block.class);            aw.readFields(in);            writables = aw.get();            Block blocks[] = new Block[writables.length];            System.arraycopy(writables, 0, blocks, 0, blocks.length);            // add to the file tree            fsDir.unprotectedAddFile(name, blocks, replication );            break;          }          case OP_SET_REPLICATION: {            UTF8 src = new UTF8();            UTF8 repl = new UTF8();            src.readFields(in);            repl.readFields(in);            replication = adjustReplication(                            fromLogReplication(repl),                            conf);            fsDir.unprotectedSetReplication(src.toString(),                 replication,                null);            break;          }           case OP_RENAME: {            UTF8 src = new UTF8();            UTF8 dst = new UTF8();            src.readFields(in);            dst.readFields(in);            fsDir.unprotectedRenameTo(src, dst);            break;          }          case OP_DELETE: {            UTF8 src = new UTF8();            src.readFields(in);            fsDir.unprotectedDelete(src);            break;          }          case OP_MKDIR: {            UTF8 src = new UTF8();            src.readFields(in);            fsDir.unprotectedMkdir(src.toString());            break;          }          case OP_DATANODE_ADD: {            if( logVersion > -3 )              throw new IOException("Unexpected opcode " + opcode                   + " for version " + logVersion );            DatanodeDescriptor node = new DatanodeDescriptor();            node.readFields(in);            fsNamesys.unprotectedAddDatanode( node );            break;          }          case OP_DATANODE_REMOVE: {            if( logVersion > -3 )              throw new IOException("Unexpected opcode " + opcode                   + " for version " + logVersion );            DatanodeID nodeID = new DatanodeID();            nodeID.readFields(in);            DatanodeDescriptor node = fsNamesys.getDatanode( nodeID );            if( node != null ) {              fsNamesys.unprotectedRemoveDatanode( node );              // physically remove node from datanodeMap              fsNamesys.wipeDatanode( nodeID );            }            break;          }          default: {            throw new IOException("Never seen opcode " + opcode);          }          }        }      } finally {        in.close();      }    }        if( logVersion != FSConstants.DFS_CURRENT_VERSION ) // other version      numEdits++; // save this image asap    return numEdits;  }    static short adjustReplication( short replication, Configuration conf) {    short minReplication = (short)conf.getInt("dfs.replication.min", 1);    if( replication<minReplication ) {      replication = minReplication;    }    short maxReplication = (short)conf.getInt("dfs.replication.max", 512);    if( replication>maxReplication ) {      replication = maxReplication;    }    return replication;  }  /**   * Write an operation to the edit log   */  void logEdit(byte op, Writable w1, Writable w2) {    synchronized (editsStream) {      try {        editsStream.write(op);        if (w1 != null) {          w1.write(editsStream);        }        if (w2 != null) {          w2.write(editsStream);        }      } catch (IOException ie) {        // TODO: Must report an error here      }    }    // TODO: initialize checkpointing if the log is large enough  }  /**    * Add create file record to edit log   */  void logCreateFile( FSDirectory.INode newNode ) {    UTF8 nameReplicationPair[] = new UTF8[] {                         new UTF8( newNode.computeName() ),                         FSEditLog.toLogReplication( newNode.getReplication() )};    logEdit(OP_ADD,            new ArrayWritable( UTF8.class, nameReplicationPair ),             new ArrayWritable( Block.class, newNode.getBlocks() ));  }    /**    * Add create directory record to edit log   */  void logMkDir( FSDirectory.INode newNode ) {    logEdit(OP_MKDIR, new UTF8( newNode.computeName() ), null );  }    /**    * Add rename record to edit log   * TODO: use String parameters until just before writing to disk   */  void logRename( UTF8 src, UTF8 dst ) {    logEdit(OP_RENAME, src, dst);  }    /**    * Add set replication record to edit log   */  void logSetReplication( String src, short replication ) {    logEdit(OP_SET_REPLICATION,             new UTF8(src),             FSEditLog.toLogReplication( replication ));  }    /**    * Add delete file record to edit log   */  void logDelete( UTF8 src ) {    logEdit(OP_DELETE, src, null);  }    /**    * Creates a record in edit log corresponding to a new data node   * registration event.   */  void logAddDatanode( DatanodeDescriptor node ) {    logEdit( OP_DATANODE_ADD, node, null );  }    /**    * Creates a record in edit log corresponding to a data node   * removal event.   */  void logRemoveDatanode( DatanodeID nodeID ) {    logEdit( OP_DATANODE_REMOVE, new DatanodeID( nodeID ), null );  }    static UTF8 toLogReplication( short replication ) {    return new UTF8( Short.toString(replication));  }    static short fromLogReplication( UTF8 replication ) {    return Short.parseShort(replication.toString());  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -