namenode.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 398 行

JAVA
398
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.io.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.util.logging.*;/********************************************************** * NameNode serves as both directory namespace manager and * "inode table" for the Hadoop DFS.  There is a single NameNode * running in any DFS deployment.  (Well, except when there * is a second backup/failover NameNode.) * * The NameNode controls two critical tables: *   1)  filename->blocksequence (namespace) *   2)  block->machinelist ("inodes") * * The first table is stored on disk and is very precious. * The second table is rebuilt every time the NameNode comes * up. * * 'NameNode' refers to both this class as well as the 'NameNode server'. * The 'FSNamesystem' class actually performs most of the filesystem * management.  The majority of the 'NameNode' class itself is concerned * with exposing the IPC interface to the outside world, plus some * configuration management. * * NameNode implements the ClientProtocol interface, which allows * clients to ask for DFS services.  ClientProtocol is not * designed for direct use by authors of DFS client code.  End-users * should instead use the org.apache.nutch.hadoop.fs.FileSystem class. * * NameNode also implements the DatanodeProtocol interface, used by * DataNode programs that actually store DFS data blocks.  These * methods are invoked repeatedly and automatically by all the * DataNodes in a DFS deployment. * * @author Mike Cafarella **********************************************************/public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {    public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.NameNode");    private FSNamesystem namesystem;    private Server server;    private int handlerCount = 2;    /** only used for testing purposes  */    private boolean stopRequested = false;    /** Format a new filesystem.  Destroys any filesystem that may already     * exist at this location.  **/    public static void format(Configuration conf) throws IOException {      FSDirectory.format(getDir(conf), conf);    }    /**     * Create a NameNode at the default location     */    public NameNode(Configuration conf) throws IOException {        this(getDir(conf),             DataNode.createSocketAddr             (conf.get("fs.default.name", "local")).getPort(), conf);    }    /**     * Create a NameNode at the specified location and start it.     */    public NameNode(File dir, int port, Configuration conf) throws IOException {        this.namesystem = new FSNamesystem(dir, conf);        this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);        this.server = RPC.getServer(this, port, handlerCount, false, conf);        this.server.start();    }    /** Return the configured directory where name data is stored. */    private static File getDir(Configuration conf) {      return new File(conf.get("dfs.name.dir", "/tmp/hadoop/dfs/name"));    }    /**     * Wait for service to finish.     * (Normally, it runs forever.)     */    public void join() {        try {            this.server.join();        } catch (InterruptedException ie) {        }    }    /**     * Stop all NameNode threads and wait for all to finish.     * Package-only access since this is intended for JUnit testing.    */    void stop() {      if (! stopRequested) {        stopRequested = true;        namesystem.close();        server.stop();        //this.join();      }    }    /////////////////////////////////////////////////////    // ClientProtocol    /////////////////////////////////////////////////////    /**     */    public LocatedBlock[] open(String src) throws IOException {        Object openResults[] = namesystem.open(new UTF8(src));        if (openResults == null) {            throw new IOException("Cannot open filename " + src);        } else {            Block blocks[] = (Block[]) openResults[0];            DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1];            LocatedBlock results[] = new LocatedBlock[blocks.length];            for (int i = 0; i < blocks.length; i++) {                results[i] = new LocatedBlock(blocks[i], sets[i]);            }            return results;        }    }    /**     */    public LocatedBlock create(String src, String clientName, String clientMachine, boolean overwrite) throws IOException {        Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), new UTF8(clientMachine), overwrite);        if (results == null) {            throw new IOException("Cannot create file " + src + " on client " + clientName);        } else {            Block b = (Block) results[0];            DatanodeInfo targets[] = (DatanodeInfo[]) results[1];            return new LocatedBlock(b, targets);        }    }    /**     */    public LocatedBlock addBlock(String src, String clientMachine) throws IOException {        int retries = 5;        Object results[] = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine));        while (results != null && results[0] == null && retries > 0) {            try {                Thread.sleep(100);            } catch (InterruptedException ie) {            }            results = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine));            retries--;        }        if (results == null) {            throw new IOException("Cannot obtain additional block for file " + src);        } else if (results[0] == null) {            return null;        } else {            Block b = (Block) results[0];            DatanodeInfo targets[] = (DatanodeInfo[]) results[1];            return new LocatedBlock(b, targets);        }    }    /**     * The client can report in a set written blocks that it wrote.     * These blocks are reported via the client instead of the datanode     * to prevent weird heartbeat race conditions.     */    public void reportWrittenBlock(LocatedBlock lb) throws IOException {        Block b = lb.getBlock();        DatanodeInfo targets[] = lb.getLocations();        for (int i = 0; i < targets.length; i++) {            namesystem.blockReceived(b, targets[i].getName());        }    }    /**     * The client needs to give up on the block.     */    public void abandonBlock(Block b, String src) throws IOException {        if (! namesystem.abandonBlock(b, new UTF8(src))) {            throw new IOException("Cannot abandon block during write to " + src);        }    }    /**     */    public void abandonFileInProgress(String src) throws IOException {        namesystem.abandonFileInProgress(new UTF8(src));    }    /**     */    public boolean complete(String src, String clientName) throws IOException {        int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));        if (returnCode == STILL_WAITING) {            return false;        } else if (returnCode == COMPLETE_SUCCESS) {            return true;        } else {            throw new IOException("Could not complete write to file " + src + " by " + clientName);        }    }    /**     */    public String[][] getHints(String src, long start, long len) throws IOException {        UTF8 hosts[][] = namesystem.getDatanodeHints(new UTF8(src), start, len);        if (hosts == null) {            return new String[0][];        } else {            String results[][] = new String[hosts.length][];            for (int i = 0; i < hosts.length; i++) {                results[i] = new String[hosts[i].length];                for (int j = 0; j < results[i].length; j++) {                    results[i][j] = hosts[i][j].toString();                }            }            return results;        }    }    /**     */    public boolean rename(String src, String dst) throws IOException {        return namesystem.renameTo(new UTF8(src), new UTF8(dst));    }    /**     */    public boolean delete(String src) throws IOException {        return namesystem.delete(new UTF8(src));    }    /**     */    public boolean exists(String src) throws IOException {        return namesystem.exists(new UTF8(src));    }    /**     */    public boolean isDir(String src) throws IOException {        return namesystem.isDir(new UTF8(src));    }    /**     */    public boolean mkdirs(String src) throws IOException {        return namesystem.mkdirs(new UTF8(src));    }    /**     */    public boolean obtainLock(String src, String clientName, boolean exclusive) throws IOException {        int returnCode = namesystem.obtainLock(new UTF8(src), new UTF8(clientName), exclusive);        if (returnCode == COMPLETE_SUCCESS) {            return true;        } else if (returnCode == STILL_WAITING) {            return false;        } else {            throw new IOException("Failure when trying to obtain lock on " + src);        }    }    /**     */    public boolean releaseLock(String src, String clientName) throws IOException {        int returnCode = namesystem.releaseLock(new UTF8(src), new UTF8(clientName));        if (returnCode == COMPLETE_SUCCESS) {            return true;        } else if (returnCode == STILL_WAITING) {            return false;        } else {            throw new IOException("Failure when trying to release lock on " + src);        }    }    /**     */    public void renewLease(String clientName) throws IOException {        namesystem.renewLease(new UTF8(clientName));            }    /**     */    public DFSFileInfo[] getListing(String src) throws IOException {        return namesystem.getListing(new UTF8(src));    }    /**     */    public long[] getStats() throws IOException {        long results[] = new long[2];        results[0] = namesystem.totalCapacity();        results[1] = namesystem.totalCapacity() - namesystem.totalRemaining();        return results;    }    /**     */    public DatanodeInfo[] getDatanodeReport() throws IOException {        DatanodeInfo results[] = namesystem.datanodeReport();        if (results == null || results.length == 0) {            throw new IOException("Cannot find datanode report");        }        return results;    }    ////////////////////////////////////////////////////////////////    // DatanodeProtocol    ////////////////////////////////////////////////////////////////    /**     */    public void sendHeartbeat(String sender, long capacity, long remaining) {        namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);    }    public Block[] blockReport(String sender, Block blocks[]) {        LOG.info("Block report from "+sender+": "+blocks.length+" blocks.");        return namesystem.processReport(blocks, new UTF8(sender));    }    public void blockReceived(String sender, Block blocks[]) {        for (int i = 0; i < blocks.length; i++) {            namesystem.blockReceived(blocks[i], new UTF8(sender));        }    }    /**     */    public void errorReport(String sender, String msg) {        // Log error message from datanode        //LOG.info("Report from " + sender + ": " + msg);    }    /**     * Return a block-oriented command for the datanode to execute.     * This will be either a transfer or a delete operation.     */    public BlockCommand getBlockwork(String sender, int xmitsInProgress) {        //        // Ask to perform pending transfers, if any        //        Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), xmitsInProgress);        if (xferResults != null) {            return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);        }        //        // If there are no transfers, check for recently-deleted blocks that        // should be removed.  This is not a full-datanode sweep, as is done during        // a block report.  This is just a small fast removal of blocks that have        // just been removed.        //        Block blocks[] = namesystem.blocksToInvalidate(new UTF8(sender));        if (blocks != null) {            return new BlockCommand(blocks);        }        return null;    }    /**     */    public static void main(String argv[]) throws Exception {        Configuration conf = new Configuration();        if (argv.length == 1 && argv[0].equals("-format")) {          File dir = getDir(conf);          if (dir.exists()) {            System.err.print("Re-format filesystem in " + dir +" ? (Y or N) ");            if (!(System.in.read() == 'Y')) {              System.err.println("Format aborted.");              System.exit(1);            }          }          format(conf);          System.err.println("Formatted "+dir);          System.exit(0);        }        NameNode namenode = new NameNode(conf);        namenode.join();    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?