📄 namenode.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.commons.logging.*;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import java.io.*;import java.net.InetAddress;import java.net.InetSocketAddress;import org.apache.hadoop.metrics.MetricsRecord;import org.apache.hadoop.metrics.Metrics;/********************************************************** * NameNode serves as both directory namespace manager and * "inode table" for the Hadoop DFS. There is a single NameNode * running in any DFS deployment. (Well, except when there * is a second backup/failover NameNode.) * * The NameNode controls two critical tables: * 1) filename->blocksequence (namespace) * 2) block->machinelist ("inodes") * * The first table is stored on disk and is very precious. * The second table is rebuilt every time the NameNode comes * up. * * 'NameNode' refers to both this class as well as the 'NameNode server'. * The 'FSNamesystem' class actually performs most of the filesystem * management. The majority of the 'NameNode' class itself is concerned * with exposing the IPC interface to the outside world, plus some * configuration management. * * NameNode implements the ClientProtocol interface, which allows * clients to ask for DFS services. ClientProtocol is not * designed for direct use by authors of DFS client code. End-users * should instead use the org.apache.nutch.hadoop.fs.FileSystem class. * * NameNode also implements the DatanodeProtocol interface, used by * DataNode programs that actually store DFS data blocks. These * methods are invoked repeatedly and automatically by all the * DataNodes in a DFS deployment. * * @author Mike Cafarella **********************************************************/public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants { public long getProtocolVersion(String protocol, long clientVersion) { if (protocol.equals(ClientProtocol.class.getName())) { return ClientProtocol.versionID; } else { return DatanodeProtocol.versionID; } } public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode"); public static final Log stateChangeLog = LogFactory.getLog( "org.apache.hadoop.dfs.StateChange"); private FSNamesystem namesystem; private Server server; private int handlerCount = 2; /** only used for testing purposes */ private boolean stopRequested = false; /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ public static void format(Configuration conf) throws IOException { FSDirectory.format(getDir(conf), conf); } private class NameNodeMetrics { private MetricsRecord metricsRecord = null; private long numFilesCreated = 0L; private long numFilesOpened = 0L; private long numFilesRenamed = 0L; private long numFilesListed = 0L; NameNodeMetrics() { metricsRecord = Metrics.createRecord("dfs", "namenode"); } synchronized void createFile() { Metrics.report(metricsRecord, "files-created", ++numFilesCreated); } synchronized void openFile() { Metrics.report(metricsRecord, "files-opened", ++numFilesOpened); } synchronized void renameFile() { Metrics.report(metricsRecord, "files-renamed", ++numFilesRenamed); } synchronized void listFile(int nfiles) { numFilesListed += nfiles; Metrics.report(metricsRecord, "files-listed", numFilesListed); } } private NameNodeMetrics myMetrics = null; /** * Create a NameNode at the default location */ public NameNode(Configuration conf) throws IOException { this(getDir(conf),DataNode.createSocketAddr(conf.get("fs.default.name", "local")).getHostName(), DataNode.createSocketAddr(conf.get("fs.default.name", "local")).getPort(), conf); } /** * Create a NameNode at the specified location and start it. */ public NameNode(File dir, String bindAddress, int port, Configuration conf) throws IOException { this.namesystem = new FSNamesystem(dir, conf); this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10); this.server = RPC.getServer(this, bindAddress, port, handlerCount, false, conf); this.server.start(); myMetrics = new NameNodeMetrics(); } /** Return the configured directory where name data is stored. */ private static File getDir(Configuration conf) { return new File(conf.get("dfs.name.dir", "/tmp/hadoop/dfs/name")); } /** * Wait for service to finish. * (Normally, it runs forever.) */ public void join() { try { this.server.join(); } catch (InterruptedException ie) { } } /** * Stop all NameNode threads and wait for all to finish. */ public void stop() { if (! stopRequested) { stopRequested = true; namesystem.close(); server.stop(); //this.join(); } } ///////////////////////////////////////////////////// // ClientProtocol ///////////////////////////////////////////////////// /** */ public LocatedBlock[] open(String src) throws IOException { Object openResults[] = namesystem.open(new UTF8(src)); if (openResults == null) { throw new IOException("Cannot open filename " + src); } else { myMetrics.openFile(); Block blocks[] = (Block[]) openResults[0]; DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1]; LocatedBlock results[] = new LocatedBlock[blocks.length]; for (int i = 0; i < blocks.length; i++) { results[i] = new LocatedBlock(blocks[i], sets[i]); } return results; } } /** */ public LocatedBlock create(String src, String clientName, String clientMachine, boolean overwrite, short replication, long blockSize ) throws IOException { stateChangeLog.debug("*DIR* NameNode.create: file " +src+" for "+clientName+" at "+clientMachine); if (!checkPathLength(src)) { throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), new UTF8(clientMachine), overwrite, replication, blockSize); myMetrics.createFile(); Block b = (Block) results[0]; DatanodeInfo targets[] = (DatanodeInfo[]) results[1]; return new LocatedBlock(b, targets); } public boolean setReplication( String src, short replication ) throws IOException { return namesystem.setReplication( src, replication ); } /** */ public LocatedBlock addBlock(String src, String clientName) throws IOException { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " +src+" for "+clientName); UTF8 src8 = new UTF8(src); UTF8 client8 = new UTF8(clientName); Object[] results = namesystem.getAdditionalBlock(src8, client8); Block b = (Block) results[0]; DatanodeInfo targets[] = (DatanodeInfo[]) results[1]; return new LocatedBlock(b, targets); } /** * The client can report in a set written blocks that it wrote. * These blocks are reported via the client instead of the datanode * to prevent weird heartbeat race conditions. */ public void reportWrittenBlock(LocatedBlock lb) throws IOException { Block b = lb.getBlock(); DatanodeInfo targets[] = lb.getLocations(); stateChangeLog.debug("*BLOCK* NameNode.reportWrittenBlock" +": " + b.getBlockName() +" is written to " +targets.length + " locations" ); for (int i = 0; i < targets.length; i++) { namesystem.blockReceived( targets[i], b ); } } /** * The client needs to give up on the block. */ public void abandonBlock(Block b, String src) throws IOException { stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " +b.getBlockName()+" of file "+src ); if (! namesystem.abandonBlock(b, new UTF8(src))) { throw new IOException("Cannot abandon block during write to " + src); } } /** */ public void abandonFileInProgress(String src, String holder) throws IOException { stateChangeLog.debug("*DIR* NameNode.abandonFileInProgress:" + src ); namesystem.abandonFileInProgress(new UTF8(src), new UTF8(holder)); } /** */ public boolean complete(String src, String clientName) throws IOException { stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName ); int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName)); if (returnCode == STILL_WAITING) { return false; } else if (returnCode == COMPLETE_SUCCESS) { return true; } else { throw new IOException("Could not complete write to file " + src + " by " + clientName); } } /** */ public String[][] getHints(String src, long start, long len) throws IOException { UTF8 hosts[][] = namesystem.getDatanodeHints(new UTF8(src), start, len); if (hosts == null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -