⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fsnamesystem.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.commons.logging.*;import org.apache.hadoop.io.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.hadoop.mapred.StatusHttpServer;import org.apache.hadoop.fs.Path;import java.io.*;import java.net.InetSocketAddress;import java.util.*;/*************************************************** * FSNamesystem does the actual bookkeeping work for the * DataNode. * * It tracks several important tables. * * 1)  valid fsname --> blocklist  (kept on disk, logged) * 2)  Set of all valid blocks (inverted #1) * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports) * 4)  machine --> blocklist (inverted #2) * 5)  LRU cache of updated-heartbeat machines ***************************************************/class FSNamesystem implements FSConstants {    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");    //    // Stores the correct file name hierarchy    //    FSDirectory dir;    //    // Stores the block-->datanode(s) map.  Updated only in response    // to client-sent information.    // Mapping: Block -> TreeSet<DatanodeDescriptor>    //    Map blocksMap = new HashMap();    /**     * Stores the datanode -> block map.       * <p>     * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by      * storage id. In order to keep the storage map consistent it tracks      * all storages ever registered with the namenode.     * A descriptor corresponding to a specific storage id can be     * <ul>      * <li>added to the map if it is a new storage id;</li>     * <li>updated with a new datanode started as a replacement for the old one      * with the same storage id; and </li>     * <li>removed if and only if an existing datanode is restarted to serve a     * different storage id.</li>     * </ul> <br>     * The list of the {@link DatanodeDescriptor}s in the map is checkpointed     * in the namespace image file. Only the {@link DatanodeInfo} part is      * persistent, the list of blocks is restored from the datanode block     * reports.      * <p>     * Mapping: StorageID -> DatanodeDescriptor     */    TreeMap datanodeMap = new TreeMap();    //    // Keeps a Vector for every named machine.  The Vector contains    // blocks that have recently been invalidated and are thought to live    // on the machine in question.    // Mapping: StorageID -> Vector<Block>    //    TreeMap recentInvalidateSets = new TreeMap();    //    // Keeps a TreeSet for every named node.  Each treeset contains    // a list of the blocks that are "extra" at that location.  We'll    // eventually remove these extras.    // Mapping: Block -> TreeSet<DatanodeDescriptor>    //    TreeMap excessReplicateMap = new TreeMap();    //    // Keeps track of files that are being created, plus the    // blocks that make them up.    // Mapping: fileName -> FileUnderConstruction    //    TreeMap pendingCreates = new TreeMap();    //    // Keeps track of the blocks that are part of those pending creates    // Set of: Block    //    TreeSet pendingCreateBlocks = new TreeSet();    //    // Stats on overall usage    //    long totalCapacity = 0, totalRemaining = 0;        //    // For the HTTP browsing interface    //    StatusHttpServer infoServer;    int infoPort;    String infoBindAddress;    Date startTime;        //    Random r = new Random();    /**     * Stores a set of DatanodeDescriptor objects, sorted by heartbeat.     * This is a subset of {@link #datanodeMap}, containing nodes that are      * considered alive.     * The {@link HeartbeatMonitor} periodically checks for outdated entries,     * and removes them from the set.     */    TreeSet heartbeats = new TreeSet(new Comparator() {        public int compare(Object o1, Object o2) {            DatanodeDescriptor d1 = (DatanodeDescriptor) o1;            DatanodeDescriptor d2 = (DatanodeDescriptor) o2;                        long lu1 = d1.getLastUpdate();            long lu2 = d2.getLastUpdate();            if (lu1 < lu2) {                return -1;            } else if (lu1 > lu2) {                return 1;            } else {                return d1.getStorageID().compareTo(d2.getStorageID());            }        }    });    //    // Store set of Blocks that need to be replicated 1 or more times.    // We also store pending replication-orders.    // Set of: Block    //    private TreeSet neededReplications = new TreeSet();    private TreeSet pendingReplications = new TreeSet();    //    // Used for handling lock-leases    // Mapping: leaseHolder -> Lease    //    private TreeMap leases = new TreeMap();    // Set of: Lease    private TreeSet sortedLeases = new TreeSet();    //    // Threaded object that checks to see if we have been    // getting heartbeats from all clients.     //    Daemon hbthread = null;   // HeartbeatMonitor thread    Daemon lmthread = null;   // LeaseMonitor thread    Daemon smmthread = null;  // SafeModeMonitor thread    boolean fsRunning = true;    long systemStart = 0;    //  The maximum number of replicates we should allow for a single block    private int maxReplication;    //  How many outgoing replication streams a given node should have at one time    private int maxReplicationStreams;    // MIN_REPLICATION is how many copies we need in place or else we disallow the write    private int minReplication;    // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat    private int heartBeatRecheck;    public static FSNamesystem fsNamesystemObject;    private String localMachine;    private int port;    private SafeModeInfo safeMode;  // safe mode information    /**     * dir is where the filesystem directory state      * is stored     */    public FSNamesystem(File dir, Configuration conf) throws IOException {        fsNamesystemObject = this;        InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));        this.maxReplication = conf.getInt("dfs.replication.max", 512);        this.minReplication = conf.getInt("dfs.replication.min", 1);        if( minReplication <= 0 )          throw new IOException(              "Unexpected configuration parameters: dfs.replication.min = "               + minReplication              + " must be greater than 0" );        if( maxReplication >= (int)Short.MAX_VALUE )          throw new IOException(              "Unexpected configuration parameters: dfs.replication.max = "               + maxReplication + " must be less than " + (Short.MAX_VALUE) );        if( maxReplication < minReplication )          throw new IOException(              "Unexpected configuration parameters: dfs.replication.min = "               + minReplication              + " must be less than dfs.replication.max = "               + maxReplication );        this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);        this.heartBeatRecheck= 1000;        this.localMachine = addr.getHostName();        this.port = addr.getPort();        this.dir = new FSDirectory(dir);        this.dir.loadFSImage( conf );        this.safeMode = new SafeModeInfo( conf );        setBlockTotal();        this.hbthread = new Daemon(new HeartbeatMonitor());        this.lmthread = new Daemon(new LeaseMonitor());        hbthread.start();        lmthread.start();        this.systemStart = now();        this.startTime = new Date(systemStart);         this.infoPort = conf.getInt("dfs.info.port", 50070);        this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");        this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);        this.infoServer.start();    }    /** Return the FSNamesystem object     *      */    public static FSNamesystem getFSNamesystem() {        return fsNamesystemObject;    }     /** Close down this filesystem manager.     * Causes heartbeat and lease daemons to stop; waits briefly for     * them to finish, but a short timeout returns control back to caller.     */    public void close() {      synchronized (this) {        fsRunning = false;      }        try {            infoServer.stop();            hbthread.join(3000);        } catch (InterruptedException ie) {        } finally {          // using finally to ensure we also wait for lease daemon          try {            lmthread.join(3000);          } catch (InterruptedException ie) {          } finally {              try {                dir.close();              } catch (IOException ex) {                  // do nothing              }          }        }    }    /////////////////////////////////////////////////////////    //    // These methods are called by HadoopFS clients    //    /////////////////////////////////////////////////////////    /**     * The client wants to open the given filename.  Return a     * list of (block,machineArray) pairs.  The sequence of unique blocks     * in the list indicates all the blocks that make up the filename.     *     * The client should choose one of the machines from the machineArray     * at random.     */    public Object[] open(UTF8 src) {        Object results[] = null;        Block blocks[] = dir.getFile(src);        if (blocks != null) {            results = new Object[2];            DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];            for (int i = 0; i < blocks.length; i++) {                TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]);                if (containingNodes == null) {                    machineSets[i] = new DatanodeDescriptor[0];                } else {                    machineSets[i] = new DatanodeDescriptor[containingNodes.size()];                    int j = 0;                    for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) {                        machineSets[i][j] = (DatanodeDescriptor) it.next();                    }                }            }            results[0] = blocks;            results[1] = machineSets;        }        return results;    }    /**     * Set replication for an existing file.     *      * The NameNode sets new replication and schedules either replication of      * under-replicated data blocks or removal of the eccessive block copies      * if the blocks are over-replicated.     *      * @see ClientProtocol#setReplication(String, short)     * @param src file name     * @param replication new replication     * @return true if successful;      *         false if file does not exist or is a directory     * @author shv     */    public boolean setReplication(String src,                                   short replication                                ) throws IOException {      if( isInSafeMode() )        throw new SafeModeException( "Cannot set replication for " + src, safeMode );      verifyReplication(src, replication, null );      Vector oldReplication = new Vector();      Block[] fileBlocks;      fileBlocks = dir.setReplication( src, replication, oldReplication );      if( fileBlocks == null )  // file not found or is a directory        return false;      int oldRepl = ((Integer)oldReplication.elementAt(0)).intValue();      if( oldRepl == replication ) // the same replication        return true;      synchronized( neededReplications ) {        if( oldRepl < replication ) {           // old replication < the new one; need to replicate          LOG.info("Increasing replication for file " + src                     + ". New replication is " + replication );          for( int idx = 0; idx < fileBlocks.length; idx++ )            neededReplications.add( fileBlocks[idx] );        } else {  

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -