dfsclient.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 847 行 · 第 1/2 页

JAVA
847
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/******************************************************** * DFSClient can connect to a Hadoop Filesystem and  * perform basic file tasks.  It uses the ClientProtocol * to communicate with a NameNode daemon, and connects  * directly to DataNodes to read/write block data. * * Hadoop DFS users should obtain an instance of  * DistributedFileSystem, which uses DFSClient to handle * filesystem tasks. * * @author Mike Cafarella, Tessa MacDuff ********************************************************/class DFSClient implements FSConstants {    public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.DFSClient");    static int MAX_BLOCK_ACQUIRE_FAILURES = 3;    ClientProtocol namenode;    String localName;    boolean running = true;    Random r = new Random();    String clientName;    Daemon leaseChecker;    private Configuration conf;    /**      * Create a new DFSClient connected to the given namenode server.     */    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) {        this.conf = conf;        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf);        try {            this.localName = InetAddress.getLocalHost().getHostName();        } catch (UnknownHostException uhe) {            this.localName = "";        }        this.clientName = "DFSClient_" + r.nextInt();        this.leaseChecker = new Daemon(new LeaseChecker());        this.leaseChecker.start();    }    /**     */    public void close() throws IOException {        this.running = false;        try {            leaseChecker.join();        } catch (InterruptedException ie) {        }    }    /**     * Get hints about the location of the indicated block(s).  The     * array returned is as long as there are blocks in the indicated     * range.  Each block may have one or more locations.     */    public String[][] getHints(UTF8 src, long start, long len) throws IOException {        return namenode.getHints(src.toString(), start, len);    }    /**     * Create an input stream that obtains a nodelist from the     * namenode, and then reads from all the right places.  Creates     * inner subclass of InputStream that does the right out-of-band     * work.     */    public FSInputStream open(UTF8 src) throws IOException {        // Get block info from namenode        return new DFSInputStream(src.toString());    }    public FSOutputStream create(UTF8 src, boolean overwrite) throws IOException {        return new DFSOutputStream(src, overwrite);    }    /**     * Make a direct connection to namenode and manipulate structures     * there.     */    public boolean rename(UTF8 src, UTF8 dst) throws IOException {        return namenode.rename(src.toString(), dst.toString());    }    /**     * Make a direct connection to namenode and manipulate structures     * there.     */    public boolean delete(UTF8 src) throws IOException {        return namenode.delete(src.toString());    }    /**     */    public boolean exists(UTF8 src) throws IOException {        return namenode.exists(src.toString());    }    /**     */    public boolean isDirectory(UTF8 src) throws IOException {        return namenode.isDir(src.toString());    }    /**     */    public DFSFileInfo[] listFiles(UTF8 src) throws IOException {        return namenode.getListing(src.toString());    }    /**     */    public long totalRawCapacity() throws IOException {        long rawNums[] = namenode.getStats();        return rawNums[0];    }    /**     */    public long totalRawUsed() throws IOException {        long rawNums[] = namenode.getStats();        return rawNums[1];    }    public DatanodeInfo[] datanodeReport() throws IOException {        return namenode.getDatanodeReport();    }    /**     */    public boolean mkdirs(UTF8 src) throws IOException {        return namenode.mkdirs(src.toString());    }    /**     */    public void lock(UTF8 src, boolean exclusive) throws IOException {        long start = System.currentTimeMillis();        boolean hasLock = false;        while (! hasLock) {            hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);            if (! hasLock) {                try {                    Thread.sleep(400);                    if (System.currentTimeMillis() - start > 5000) {                        LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");                        Thread.sleep(2000);                    }                } catch (InterruptedException ie) {                }            }        }    }    /**     *     */    public void release(UTF8 src) throws IOException {        boolean hasReleased = false;        while (! hasReleased) {            hasReleased = namenode.releaseLock(src.toString(), clientName);            if (! hasReleased) {                LOG.info("Could not release.  Retrying...");                try {                    Thread.sleep(2000);                } catch (InterruptedException ie) {                }            }        }    }    /**     * Pick the best node from which to stream the data.     * That's the local one, if available.     */    private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {        if ((nodes == null) ||             (nodes.length - deadNodes.size() < 1)) {            throw new IOException("No live nodes contain current block");        }        DatanodeInfo chosenNode = null;        for (int i = 0; i < nodes.length; i++) {            if (deadNodes.contains(nodes[i])) {                continue;            }            String nodename = nodes[i].getName().toString();            int colon = nodename.indexOf(':');            if (colon >= 0) {                nodename = nodename.substring(0, colon);            }            if (localName.equals(nodename)) {                chosenNode = nodes[i];                break;            }        }        if (chosenNode == null) {            do {                chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];            } while (deadNodes.contains(chosenNode));        }        return chosenNode;    }    /***************************************************************     * Periodically check in with the namenode and renew all the leases     * when the lease period is half over.     ***************************************************************/    class LeaseChecker implements Runnable {        /**         */        public void run() {            long lastRenewed = 0;            while (running) {                if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) {                    try {                        namenode.renewLease(clientName);                        lastRenewed = System.currentTimeMillis();                    } catch (IOException ie) {                    }                }                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                }            }        }    }    /****************************************************************     * DFSInputStream provides bytes from a named file.  It handles      * negotiation of the namenode and various datanodes as necessary.     ****************************************************************/    class DFSInputStream extends FSInputStream {        private Socket s = null;        boolean closed = false;        private String src;        private DataInputStream blockStream;        private Block blocks[] = null;        private DatanodeInfo nodes[][] = null;        private long pos = 0;        private long filelen = 0;        private long blockEnd = -1;        /**         */        public DFSInputStream(String src) throws IOException {            this.src = src;            openInfo();            this.blockStream = null;            for (int i = 0; i < blocks.length; i++) {                this.filelen += blocks[i].getNumBytes();            }        }        /**         * Grab the open-file info from namenode         */        void openInfo() throws IOException {            Block oldBlocks[] = this.blocks;            LocatedBlock results[] = namenode.open(src);                        Vector blockV = new Vector();            Vector nodeV = new Vector();            for (int i = 0; i < results.length; i++) {                blockV.add(results[i].getBlock());                nodeV.add(results[i].getLocations());            }            Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);            if (oldBlocks != null) {                for (int i = 0; i < oldBlocks.length; i++) {                    if (! oldBlocks[i].equals(newBlocks[i])) {                        throw new IOException("Blocklist for " + src + " has changed!");                    }                }                if (oldBlocks.length != newBlocks.length) {                    throw new IOException("Blocklist for " + src + " now has different length");                }            }            this.blocks = newBlocks;            this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);        }        /**         * Open a DataInputStream to a DataNode so that it can be read from.         * We get block ID and the IDs of the destinations at startup, from the namenode.         */        private synchronized void blockSeekTo(long target) throws IOException {            if (target >= filelen) {                throw new IOException("Attempted to read past end of file");            }            if (s != null) {                s.close();                s = null;            }            //            // Compute desired block            //            int targetBlock = -1;            long targetBlockStart = 0;            long targetBlockEnd = 0;            for (int i = 0; i < blocks.length; i++) {                long blocklen = blocks[i].getNumBytes();                targetBlockEnd = targetBlockStart + blocklen - 1;                if (target >= targetBlockStart && target <= targetBlockEnd) {                    targetBlock = i;                    break;                } else {                    targetBlockStart = targetBlockEnd + 1;                                    }            }            if (targetBlock < 0) {                throw new IOException("Impossible situation: could not find target position " + target);            }            long offsetIntoBlock = target - targetBlockStart;            //            // Connect to best DataNode for desired Block, with potential offset            //            int failures = 0;            InetSocketAddress targetAddr = null;            TreeSet deadNodes = new TreeSet();            while (s == null) {                DatanodeInfo chosenNode;                try {                    chosenNode = bestNode(nodes[targetBlock], deadNodes);                    targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString());                } catch (IOException ie) {                    String blockInfo =                      blocks[targetBlock]+" file="+src+" offset="+target;                    if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {                        throw new IOException("Could not obtain block: " + blockInfo);                    }                    if (nodes[targetBlock] == null || nodes[targetBlock].length == 0) {                        LOG.info("No node available for block: " + blockInfo);                    }                    LOG.info("Could not obtain block from any node:  " + ie);                    try {                        Thread.sleep(3000);                    } catch (InterruptedException iex) {                    }                    deadNodes.clear();                    openInfo();                    failures++;                    continue;                }                try {                    s = new Socket();                    s.connect(targetAddr, READ_TIMEOUT);                    s.setSoTimeout(READ_TIMEOUT);                    //                    // Xmit header info to datanode                    //                    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                    out.write(OP_READSKIP_BLOCK);                    blocks[targetBlock].write(out);                    out.writeLong(offsetIntoBlock);                    out.flush();                    //                    // Get bytes in block, set streams                    //                    DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));                    long curBlockSize = in.readLong();                    long amtSkipped = in.readLong();                    if (curBlockSize != blocks[targetBlock].len) {                        throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);                    }                    if (amtSkipped != offsetIntoBlock) {                        throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);                    }                    this.pos = target;                    this.blockEnd = targetBlockEnd;                    this.blockStream = in;                } catch (IOException ex) {                    // Put chosen node into dead list, continue                    LOG.info("Failed to connect to " + targetAddr + ":" + ex);                    deadNodes.add(chosenNode);                    if (s != null) {                        try {                            s.close();                        } catch (IOException iex) {                        }                                            }                    s = null;                }            }        }        /**         * Close it down!         */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?