📄 dfsclient.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.commons.logging.*;import java.io.*;import java.net.*;import java.util.*;/******************************************************** * DFSClient can connect to a Hadoop Filesystem and * perform basic file tasks. It uses the ClientProtocol * to communicate with a NameNode daemon, and connects * directly to DataNodes to read/write block data. * * Hadoop DFS users should obtain an instance of * DistributedFileSystem, which uses DFSClient to handle * filesystem tasks. * * @author Mike Cafarella, Tessa MacDuff ********************************************************/class DFSClient implements FSConstants { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient"); static int MAX_BLOCK_ACQUIRE_FAILURES = 3; private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; ClientProtocol namenode; String localName; boolean running = true; Random r = new Random(); String clientName; Daemon leaseChecker; private Configuration conf; private long defaultBlockSize; private short defaultReplication; /** * A map from name -> DFSOutputStream of files that are currently being * written by this client. */ private TreeMap pendingCreates = new TreeMap(); /** * A class to track the list of DFS clients, so that they can be closed * on exit. * @author Owen O'Malley */ private static class ClientFinalizer extends Thread { private List clients = new ArrayList(); public synchronized void addClient(DFSClient client) { clients.add(client); } public synchronized void run() { Iterator itr = clients.iterator(); while (itr.hasNext()) { DFSClient client = (DFSClient) itr.next(); if (client.running) { try { client.close(); } catch (IOException ie) { System.err.println("Error closing client"); ie.printStackTrace(); } } } } } // add a cleanup thread private static ClientFinalizer clientFinalizer = new ClientFinalizer(); static { Runtime.getRuntime().addShutdownHook(clientFinalizer); } /** * Create a new DFSClient connected to the given namenode server. */ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { this.conf = conf; this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, nameNodeAddr, conf); try { this.localName = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException uhe) { this.localName = ""; } String taskId = conf.get("mapred.task.id"); if (taskId != null) { this.clientName = "DFSClient_" + taskId; } else { this.clientName = "DFSClient_" + r.nextInt(); } defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); defaultReplication = (short) conf.getInt("dfs.replication", 3); this.leaseChecker = new Daemon(new LeaseChecker()); this.leaseChecker.start(); } private void checkOpen() throws IOException { if (!running) { IOException result = new IOException("Filesystem closed"); throw result; } } /** * Close the file system, abadoning all of the leases and files being * created. */ public void close() throws IOException { // synchronize in here so that we don't need to change the API synchronized (this) { checkOpen(); synchronized (pendingCreates) { Iterator file_itr = pendingCreates.keySet().iterator(); while (file_itr.hasNext()) { String name = (String) file_itr.next(); try { namenode.abandonFileInProgress(name, clientName); } catch (IOException ie) { System.err.println("Exception abandoning create lock on " + name); ie.printStackTrace(); } } pendingCreates.clear(); } this.running = false; try { leaseChecker.join(); } catch (InterruptedException ie) { } } } /** * Get the default block size for this cluster * @return the default block size in bytes */ public long getDefaultBlockSize() { return defaultBlockSize; } public long getBlockSize(Path f) throws IOException { // if we already know the answer, use it. if (f instanceof DfsPath) { return ((DfsPath) f).getBlockSize(); } int retries = 4; while (true) { try { return namenode.getBlockSize(f.toString()); } catch (IOException ie) { LOG.info("Problem getting block size: " + StringUtils.stringifyException(ie)); if (--retries == 0) { throw ie; } } } } public short getDefaultReplication() { return defaultReplication; } /** * Get hints about the location of the indicated block(s). The * array returned is as long as there are blocks in the indicated * range. Each block may have one or more locations. */ public String[][] getHints(UTF8 src, long start, long len) throws IOException { return namenode.getHints(src.toString(), start, len); } /** * Create an input stream that obtains a nodelist from the * namenode, and then reads from all the right places. Creates * inner subclass of InputStream that does the right out-of-band * work. */ public FSInputStream open(UTF8 src) throws IOException { checkOpen(); // Get block info from namenode return new DFSInputStream(src.toString()); } /** * Create a new dfs file and return an output stream for writing into it. * * @param src stream name * @param overwrite do not check for file existence if true * @return output stream * @throws IOException */ public FSOutputStream create( UTF8 src, boolean overwrite ) throws IOException { return create( src, overwrite, defaultReplication, defaultBlockSize, null); } /** * Create a new dfs file and return an output stream for writing into it * with write-progress reporting. * * @param src stream name * @param overwrite do not check for file existence if true * @return output stream * @throws IOException */ public FSOutputStream create( UTF8 src, boolean overwrite, Progressable progress ) throws IOException { return create( src, overwrite, defaultReplication, defaultBlockSize, null); } /** * Create a new dfs file with the specified block replication * and return an output stream for writing into the file. * * @param src stream name * @param overwrite do not check for file existence if true * @param replication block replication * @return output stream * @throws IOException */ public FSOutputStream create( UTF8 src, boolean overwrite, short replication, long blockSize ) throws IOException { return create(src, overwrite, replication, blockSize, null); } /** * Create a new dfs file with the specified block replication * with write-progress reporting and return an output stream for writing * into the file. * * @param src stream name * @param overwrite do not check for file existence if true * @param replication block replication * @return output stream * @throws IOException */ public FSOutputStream create( UTF8 src, boolean overwrite, short replication, long blockSize, Progressable progress ) throws IOException { checkOpen(); FSOutputStream result = new DFSOutputStream(src, overwrite, replication, blockSize, progress); synchronized (pendingCreates) { pendingCreates.put(src.toString(), result); } return result; } /** * Set replication for an existing file. * * @see ClientProtocol#setReplication(String, short) * @param replication * @throws IOException * @return true is successful or false if file does not exist * @author shv */ public boolean setReplication(UTF8 src, short replication ) throws IOException { return namenode.setReplication(src.toString(), replication); } /** * Make a direct connection to namenode and manipulate structures * there. */ public boolean rename(UTF8 src, UTF8 dst) throws IOException { checkOpen(); return namenode.rename(src.toString(), dst.toString()); } /** * Make a direct connection to namenode and manipulate structures * there. */ public boolean delete(UTF8 src) throws IOException { checkOpen(); return namenode.delete(src.toString()); } /** */ public boolean exists(UTF8 src) throws IOException { checkOpen(); return namenode.exists(src.toString()); } /** */ public boolean isDirectory(UTF8 src) throws IOException { checkOpen(); return namenode.isDir(src.toString()); } /** */ public DFSFileInfo[] listPaths(UTF8 src) throws IOException { checkOpen(); return namenode.getListing(src.toString()); } /** */ public long totalRawCapacity() throws IOException { long rawNums[] = namenode.getStats(); return rawNums[0]; } /** */ public long totalRawUsed() throws IOException { long rawNums[] = namenode.getStats(); return rawNums[1]; } public DatanodeInfo[] datanodeReport() throws IOException { return namenode.getDatanodeReport(); } /** * Enter, leave or get safe mode. * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)} * for more details. * * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction) */ public boolean setSafeMode( SafeModeAction action ) throws IOException { return namenode.setSafeMode( action ); } /** */ public boolean mkdirs(UTF8 src) throws IOException { checkOpen(); return namenode.mkdirs(src.toString()); } /** */ public void lock(UTF8 src, boolean exclusive) throws IOException { long start = System.currentTimeMillis(); boolean hasLock = false; while (! hasLock) { hasLock = namenode.obtainLock(src.toString(), clientName, exclusive); if (! hasLock) { try { Thread.sleep(400); if (System.currentTimeMillis() - start > 5000) { LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms."); Thread.sleep(2000); } } catch (InterruptedException ie) { } } } } /** * */ public void release(UTF8 src) throws IOException { boolean hasReleased = false; while (! hasReleased) { hasReleased = namenode.releaseLock(src.toString(), clientName); if (! hasReleased) { LOG.info("Could not release. Retrying..."); try { Thread.sleep(2000); } catch (InterruptedException ie) { } } } } /** * Pick the best node from which to stream the data. * That's the local one, if available. */ private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException { if ((nodes == null) || (nodes.length - deadNodes.size() < 1)) { throw new IOException("No live nodes contain current block"); } DatanodeInfo chosenNode = null; for (int i = 0; i < nodes.length; i++) { if (deadNodes.contains(nodes[i])) { continue; } String nodename = nodes[i].getHost(); if (localName.equals(nodename)) { chosenNode = nodes[i];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -