⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dfsclient.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.commons.logging.*;import java.io.*;import java.net.*;import java.util.*;/******************************************************** * DFSClient can connect to a Hadoop Filesystem and  * perform basic file tasks.  It uses the ClientProtocol * to communicate with a NameNode daemon, and connects  * directly to DataNodes to read/write block data. * * Hadoop DFS users should obtain an instance of  * DistributedFileSystem, which uses DFSClient to handle * filesystem tasks. * * @author Mike Cafarella, Tessa MacDuff ********************************************************/class DFSClient implements FSConstants {    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");    static int MAX_BLOCK_ACQUIRE_FAILURES = 3;    private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;    ClientProtocol namenode;    String localName;    boolean running = true;    Random r = new Random();    String clientName;    Daemon leaseChecker;    private Configuration conf;    private long defaultBlockSize;    private short defaultReplication;        /**     * A map from name -> DFSOutputStream of files that are currently being     * written by this client.     */    private TreeMap pendingCreates = new TreeMap();        /**     * A class to track the list of DFS clients, so that they can be closed     * on exit.     * @author Owen O'Malley     */    private static class ClientFinalizer extends Thread {      private List clients = new ArrayList();      public synchronized void addClient(DFSClient client) {        clients.add(client);      }      public synchronized void run() {        Iterator itr = clients.iterator();        while (itr.hasNext()) {          DFSClient client = (DFSClient) itr.next();          if (client.running) {            try {              client.close();            } catch (IOException ie) {              System.err.println("Error closing client");              ie.printStackTrace();            }          }        }      }    }    // add a cleanup thread    private static ClientFinalizer clientFinalizer = new ClientFinalizer();    static {      Runtime.getRuntime().addShutdownHook(clientFinalizer);    }            /**      * Create a new DFSClient connected to the given namenode server.     */    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)    throws IOException {        this.conf = conf;        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,            ClientProtocol.versionID, nameNodeAddr, conf);        try {            this.localName = InetAddress.getLocalHost().getHostName();        } catch (UnknownHostException uhe) {            this.localName = "";        }        String taskId = conf.get("mapred.task.id");        if (taskId != null) {            this.clientName = "DFSClient_" + taskId;         } else {            this.clientName = "DFSClient_" + r.nextInt();        }        defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);        defaultReplication = (short) conf.getInt("dfs.replication", 3);        this.leaseChecker = new Daemon(new LeaseChecker());        this.leaseChecker.start();    }    private void checkOpen() throws IOException {      if (!running) {        IOException result = new IOException("Filesystem closed");        throw result;      }    }        /**     * Close the file system, abadoning all of the leases and files being     * created.     */    public void close() throws IOException {      // synchronize in here so that we don't need to change the API      synchronized (this) {        checkOpen();        synchronized (pendingCreates) {          Iterator file_itr = pendingCreates.keySet().iterator();          while (file_itr.hasNext()) {            String name = (String) file_itr.next();            try {              namenode.abandonFileInProgress(name, clientName);            } catch (IOException ie) {              System.err.println("Exception abandoning create lock on " + name);              ie.printStackTrace();            }          }          pendingCreates.clear();        }        this.running = false;        try {            leaseChecker.join();        } catch (InterruptedException ie) {        }      }    }    /**     * Get the default block size for this cluster     * @return the default block size in bytes     */    public long getDefaultBlockSize() {      return defaultBlockSize;    }        public long getBlockSize(Path f) throws IOException {      // if we already know the answer, use it.      if (f instanceof DfsPath) {        return ((DfsPath) f).getBlockSize();      }      int retries = 4;      while (true) {        try {          return namenode.getBlockSize(f.toString());        } catch (IOException ie) {          LOG.info("Problem getting block size: " +                    StringUtils.stringifyException(ie));          if (--retries == 0) {            throw ie;          }        }      }    }        public short getDefaultReplication() {      return defaultReplication;    }        /**     * Get hints about the location of the indicated block(s).  The     * array returned is as long as there are blocks in the indicated     * range.  Each block may have one or more locations.     */    public String[][] getHints(UTF8 src, long start, long len) throws IOException {        return namenode.getHints(src.toString(), start, len);    }    /**     * Create an input stream that obtains a nodelist from the     * namenode, and then reads from all the right places.  Creates     * inner subclass of InputStream that does the right out-of-band     * work.     */    public FSInputStream open(UTF8 src) throws IOException {        checkOpen();        //    Get block info from namenode        return new DFSInputStream(src.toString());    }    /**     * Create a new dfs file and return an output stream for writing into it.      *      * @param src stream name     * @param overwrite do not check for file existence if true     * @return output stream     * @throws IOException     */    public FSOutputStream create( UTF8 src,                                   boolean overwrite                                ) throws IOException {      return create( src, overwrite, defaultReplication, defaultBlockSize, null);    }        /**     * Create a new dfs file and return an output stream for writing into it     * with write-progress reporting.      *      * @param src stream name     * @param overwrite do not check for file existence if true     * @return output stream     * @throws IOException     */    public FSOutputStream create( UTF8 src,                                   boolean overwrite,                                  Progressable progress                                ) throws IOException {      return create( src, overwrite, defaultReplication, defaultBlockSize, null);    }        /**     * Create a new dfs file with the specified block replication      * and return an output stream for writing into the file.       *      * @param src stream name     * @param overwrite do not check for file existence if true     * @param replication block replication     * @return output stream     * @throws IOException     */    public FSOutputStream create( UTF8 src,                                   boolean overwrite,                                   short replication,                                  long blockSize                                ) throws IOException {      return create(src, overwrite, replication, blockSize, null);    }    /**     * Create a new dfs file with the specified block replication      * with write-progress reporting and return an output stream for writing     * into the file.       *      * @param src stream name     * @param overwrite do not check for file existence if true     * @param replication block replication     * @return output stream     * @throws IOException     */    public FSOutputStream create( UTF8 src,                                   boolean overwrite,                                   short replication,                                  long blockSize,                                  Progressable progress                                ) throws IOException {      checkOpen();      FSOutputStream result = new DFSOutputStream(src, overwrite,                                                   replication, blockSize, progress);      synchronized (pendingCreates) {        pendingCreates.put(src.toString(), result);      }      return result;    }    /**     * Set replication for an existing file.     *      * @see ClientProtocol#setReplication(String, short)     * @param replication     * @throws IOException     * @return true is successful or false if file does not exist      * @author shv     */    public boolean setReplication(UTF8 src,                                   short replication                                ) throws IOException {      return namenode.setReplication(src.toString(), replication);    }    /**     * Make a direct connection to namenode and manipulate structures     * there.     */    public boolean rename(UTF8 src, UTF8 dst) throws IOException {        checkOpen();        return namenode.rename(src.toString(), dst.toString());    }    /**     * Make a direct connection to namenode and manipulate structures     * there.     */    public boolean delete(UTF8 src) throws IOException {        checkOpen();        return namenode.delete(src.toString());    }    /**     */    public boolean exists(UTF8 src) throws IOException {        checkOpen();        return namenode.exists(src.toString());    }    /**     */    public boolean isDirectory(UTF8 src) throws IOException {        checkOpen();        return namenode.isDir(src.toString());    }    /**     */    public DFSFileInfo[] listPaths(UTF8 src) throws IOException {        checkOpen();        return namenode.getListing(src.toString());    }    /**     */    public long totalRawCapacity() throws IOException {        long rawNums[] = namenode.getStats();        return rawNums[0];    }    /**     */    public long totalRawUsed() throws IOException {        long rawNums[] = namenode.getStats();        return rawNums[1];    }    public DatanodeInfo[] datanodeReport() throws IOException {        return namenode.getDatanodeReport();    }        /**     * Enter, leave or get safe mode.     * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)}      * for more details.     *      * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)     */    public boolean setSafeMode( SafeModeAction action ) throws IOException {      return namenode.setSafeMode( action );    }    /**     */    public boolean mkdirs(UTF8 src) throws IOException {        checkOpen();        return namenode.mkdirs(src.toString());    }    /**     */    public void lock(UTF8 src, boolean exclusive) throws IOException {        long start = System.currentTimeMillis();        boolean hasLock = false;        while (! hasLock) {            hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);            if (! hasLock) {                try {                    Thread.sleep(400);                    if (System.currentTimeMillis() - start > 5000) {                        LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");                        Thread.sleep(2000);                    }                } catch (InterruptedException ie) {                }            }        }    }    /**     *     */    public void release(UTF8 src) throws IOException {        boolean hasReleased = false;        while (! hasReleased) {            hasReleased = namenode.releaseLock(src.toString(), clientName);            if (! hasReleased) {                LOG.info("Could not release.  Retrying...");                try {                    Thread.sleep(2000);                } catch (InterruptedException ie) {                }            }        }    }    /**     * Pick the best node from which to stream the data.     * That's the local one, if available.     */    private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {        if ((nodes == null) ||             (nodes.length - deadNodes.size() < 1)) {            throw new IOException("No live nodes contain current block");        }        DatanodeInfo chosenNode = null;        for (int i = 0; i < nodes.length; i++) {            if (deadNodes.contains(nodes[i])) {                continue;            }            String nodename = nodes[i].getHost();            if (localName.equals(nodename)) {                chosenNode = nodes[i];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -