📄 fsnamesystem.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.commons.logging.*;import org.apache.hadoop.io.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.hadoop.mapred.StatusHttpServer;import org.apache.hadoop.fs.Path;import java.io.*;import java.net.InetSocketAddress;import java.util.*;/*************************************************** * FSNamesystem does the actual bookkeeping work for the * DataNode. * * It tracks several important tables. * * 1) valid fsname --> blocklist (kept on disk, logged) * 2) Set of all valid blocks (inverted #1) * 3) block --> machinelist (kept in memory, rebuilt dynamically from reports) * 4) machine --> blocklist (inverted #2) * 5) LRU cache of updated-heartbeat machines ***************************************************/class FSNamesystem implements FSConstants { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem"); // // Stores the correct file name hierarchy // FSDirectory dir; // // Stores the block-->datanode(s) map. Updated only in response // to client-sent information. // Mapping: Block -> TreeSet<DatanodeDescriptor> // Map blocksMap = new HashMap(); /** * Stores the datanode -> block map. * <p> * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by * storage id. In order to keep the storage map consistent it tracks * all storages ever registered with the namenode. * A descriptor corresponding to a specific storage id can be * <ul> * <li>added to the map if it is a new storage id;</li> * <li>updated with a new datanode started as a replacement for the old one * with the same storage id; and </li> * <li>removed if and only if an existing datanode is restarted to serve a * different storage id.</li> * </ul> <br> * The list of the {@link DatanodeDescriptor}s in the map is checkpointed * in the namespace image file. Only the {@link DatanodeInfo} part is * persistent, the list of blocks is restored from the datanode block * reports. * <p> * Mapping: StorageID -> DatanodeDescriptor */ TreeMap datanodeMap = new TreeMap(); // // Keeps a Vector for every named machine. The Vector contains // blocks that have recently been invalidated and are thought to live // on the machine in question. // Mapping: StorageID -> Vector<Block> // TreeMap recentInvalidateSets = new TreeMap(); // // Keeps a TreeSet for every named node. Each treeset contains // a list of the blocks that are "extra" at that location. We'll // eventually remove these extras. // Mapping: Block -> TreeSet<DatanodeDescriptor> // TreeMap excessReplicateMap = new TreeMap(); // // Keeps track of files that are being created, plus the // blocks that make them up. // Mapping: fileName -> FileUnderConstruction // TreeMap pendingCreates = new TreeMap(); // // Keeps track of the blocks that are part of those pending creates // Set of: Block // TreeSet pendingCreateBlocks = new TreeSet(); // // Stats on overall usage // long totalCapacity = 0, totalRemaining = 0; // // For the HTTP browsing interface // StatusHttpServer infoServer; int infoPort; String infoBindAddress; Date startTime; // Random r = new Random(); /** * Stores a set of DatanodeDescriptor objects, sorted by heartbeat. * This is a subset of {@link #datanodeMap}, containing nodes that are * considered alive. * The {@link HeartbeatMonitor} periodically checks for outdated entries, * and removes them from the set. */ TreeSet heartbeats = new TreeSet(new Comparator() { public int compare(Object o1, Object o2) { DatanodeDescriptor d1 = (DatanodeDescriptor) o1; DatanodeDescriptor d2 = (DatanodeDescriptor) o2; long lu1 = d1.getLastUpdate(); long lu2 = d2.getLastUpdate(); if (lu1 < lu2) { return -1; } else if (lu1 > lu2) { return 1; } else { return d1.getStorageID().compareTo(d2.getStorageID()); } } }); // // Store set of Blocks that need to be replicated 1 or more times. // We also store pending replication-orders. // Set of: Block // private TreeSet neededReplications = new TreeSet(); private TreeSet pendingReplications = new TreeSet(); // // Used for handling lock-leases // Mapping: leaseHolder -> Lease // private TreeMap leases = new TreeMap(); // Set of: Lease private TreeSet sortedLeases = new TreeSet(); // // Threaded object that checks to see if we have been // getting heartbeats from all clients. // Daemon hbthread = null; // HeartbeatMonitor thread Daemon lmthread = null; // LeaseMonitor thread Daemon smmthread = null; // SafeModeMonitor thread boolean fsRunning = true; long systemStart = 0; // The maximum number of replicates we should allow for a single block private int maxReplication; // How many outgoing replication streams a given node should have at one time private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat private int heartBeatRecheck; public static FSNamesystem fsNamesystemObject; private String localMachine; private int port; private SafeModeInfo safeMode; // safe mode information /** * dir is where the filesystem directory state * is stored */ public FSNamesystem(File dir, Configuration conf) throws IOException { fsNamesystemObject = this; InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local")); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); if( minReplication <= 0 ) throw new IOException( "Unexpected configuration parameters: dfs.replication.min = " + minReplication + " must be greater than 0" ); if( maxReplication >= (int)Short.MAX_VALUE ) throw new IOException( "Unexpected configuration parameters: dfs.replication.max = " + maxReplication + " must be less than " + (Short.MAX_VALUE) ); if( maxReplication < minReplication ) throw new IOException( "Unexpected configuration parameters: dfs.replication.min = " + minReplication + " must be less than dfs.replication.max = " + maxReplication ); this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2); this.heartBeatRecheck= 1000; this.localMachine = addr.getHostName(); this.port = addr.getPort(); this.dir = new FSDirectory(dir); this.dir.loadFSImage( conf ); this.safeMode = new SafeModeInfo( conf ); setBlockTotal(); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(new LeaseMonitor()); hbthread.start(); lmthread.start(); this.systemStart = now(); this.startTime = new Date(systemStart); this.infoPort = conf.getInt("dfs.info.port", 50070); this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0"); this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false); this.infoServer.start(); } /** Return the FSNamesystem object * */ public static FSNamesystem getFSNamesystem() { return fsNamesystemObject; } /** Close down this filesystem manager. * Causes heartbeat and lease daemons to stop; waits briefly for * them to finish, but a short timeout returns control back to caller. */ public void close() { synchronized (this) { fsRunning = false; } try { infoServer.stop(); hbthread.join(3000); } catch (InterruptedException ie) { } finally { // using finally to ensure we also wait for lease daemon try { lmthread.join(3000); } catch (InterruptedException ie) { } finally { try { dir.close(); } catch (IOException ex) { // do nothing } } } } ///////////////////////////////////////////////////////// // // These methods are called by HadoopFS clients // ///////////////////////////////////////////////////////// /** * The client wants to open the given filename. Return a * list of (block,machineArray) pairs. The sequence of unique blocks * in the list indicates all the blocks that make up the filename. * * The client should choose one of the machines from the machineArray * at random. */ public Object[] open(UTF8 src) { Object results[] = null; Block blocks[] = dir.getFile(src); if (blocks != null) { results = new Object[2]; DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][]; for (int i = 0; i < blocks.length; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]); if (containingNodes == null) { machineSets[i] = new DatanodeDescriptor[0]; } else { machineSets[i] = new DatanodeDescriptor[containingNodes.size()]; int j = 0; for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) { machineSets[i][j] = (DatanodeDescriptor) it.next(); } } } results[0] = blocks; results[1] = machineSets; } return results; } /** * Set replication for an existing file. * * The NameNode sets new replication and schedules either replication of * under-replicated data blocks or removal of the eccessive block copies * if the blocks are over-replicated. * * @see ClientProtocol#setReplication(String, short) * @param src file name * @param replication new replication * @return true if successful; * false if file does not exist or is a directory * @author shv */ public boolean setReplication(String src, short replication ) throws IOException { if( isInSafeMode() ) throw new SafeModeException( "Cannot set replication for " + src, safeMode ); verifyReplication(src, replication, null ); Vector oldReplication = new Vector(); Block[] fileBlocks; fileBlocks = dir.setReplication( src, replication, oldReplication ); if( fileBlocks == null ) // file not found or is a directory return false; int oldRepl = ((Integer)oldReplication.elementAt(0)).intValue(); if( oldRepl == replication ) // the same replication return true; synchronized( neededReplications ) { if( oldRepl < replication ) { // old replication < the new one; need to replicate LOG.info("Increasing replication for file " + src + ". New replication is " + replication ); for( int idx = 0; idx < fileBlocks.length; idx++ ) neededReplications.add( fileBlocks[idx] ); } else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -