dfsclient.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 847 行 · 第 1/2 页
JAVA
847 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/******************************************************** * DFSClient can connect to a Hadoop Filesystem and * perform basic file tasks. It uses the ClientProtocol * to communicate with a NameNode daemon, and connects * directly to DataNodes to read/write block data. * * Hadoop DFS users should obtain an instance of * DistributedFileSystem, which uses DFSClient to handle * filesystem tasks. * * @author Mike Cafarella, Tessa MacDuff ********************************************************/class DFSClient implements FSConstants { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.DFSClient"); static int MAX_BLOCK_ACQUIRE_FAILURES = 3; ClientProtocol namenode; String localName; boolean running = true; Random r = new Random(); String clientName; Daemon leaseChecker; private Configuration conf; /** * Create a new DFSClient connected to the given namenode server. */ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) { this.conf = conf; this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf); try { this.localName = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException uhe) { this.localName = ""; } this.clientName = "DFSClient_" + r.nextInt(); this.leaseChecker = new Daemon(new LeaseChecker()); this.leaseChecker.start(); } /** */ public void close() throws IOException { this.running = false; try { leaseChecker.join(); } catch (InterruptedException ie) { } } /** * Get hints about the location of the indicated block(s). The * array returned is as long as there are blocks in the indicated * range. Each block may have one or more locations. */ public String[][] getHints(UTF8 src, long start, long len) throws IOException { return namenode.getHints(src.toString(), start, len); } /** * Create an input stream that obtains a nodelist from the * namenode, and then reads from all the right places. Creates * inner subclass of InputStream that does the right out-of-band * work. */ public FSInputStream open(UTF8 src) throws IOException { // Get block info from namenode return new DFSInputStream(src.toString()); } public FSOutputStream create(UTF8 src, boolean overwrite) throws IOException { return new DFSOutputStream(src, overwrite); } /** * Make a direct connection to namenode and manipulate structures * there. */ public boolean rename(UTF8 src, UTF8 dst) throws IOException { return namenode.rename(src.toString(), dst.toString()); } /** * Make a direct connection to namenode and manipulate structures * there. */ public boolean delete(UTF8 src) throws IOException { return namenode.delete(src.toString()); } /** */ public boolean exists(UTF8 src) throws IOException { return namenode.exists(src.toString()); } /** */ public boolean isDirectory(UTF8 src) throws IOException { return namenode.isDir(src.toString()); } /** */ public DFSFileInfo[] listFiles(UTF8 src) throws IOException { return namenode.getListing(src.toString()); } /** */ public long totalRawCapacity() throws IOException { long rawNums[] = namenode.getStats(); return rawNums[0]; } /** */ public long totalRawUsed() throws IOException { long rawNums[] = namenode.getStats(); return rawNums[1]; } public DatanodeInfo[] datanodeReport() throws IOException { return namenode.getDatanodeReport(); } /** */ public boolean mkdirs(UTF8 src) throws IOException { return namenode.mkdirs(src.toString()); } /** */ public void lock(UTF8 src, boolean exclusive) throws IOException { long start = System.currentTimeMillis(); boolean hasLock = false; while (! hasLock) { hasLock = namenode.obtainLock(src.toString(), clientName, exclusive); if (! hasLock) { try { Thread.sleep(400); if (System.currentTimeMillis() - start > 5000) { LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms."); Thread.sleep(2000); } } catch (InterruptedException ie) { } } } } /** * */ public void release(UTF8 src) throws IOException { boolean hasReleased = false; while (! hasReleased) { hasReleased = namenode.releaseLock(src.toString(), clientName); if (! hasReleased) { LOG.info("Could not release. Retrying..."); try { Thread.sleep(2000); } catch (InterruptedException ie) { } } } } /** * Pick the best node from which to stream the data. * That's the local one, if available. */ private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException { if ((nodes == null) || (nodes.length - deadNodes.size() < 1)) { throw new IOException("No live nodes contain current block"); } DatanodeInfo chosenNode = null; for (int i = 0; i < nodes.length; i++) { if (deadNodes.contains(nodes[i])) { continue; } String nodename = nodes[i].getName().toString(); int colon = nodename.indexOf(':'); if (colon >= 0) { nodename = nodename.substring(0, colon); } if (localName.equals(nodename)) { chosenNode = nodes[i]; break; } } if (chosenNode == null) { do { chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length]; } while (deadNodes.contains(chosenNode)); } return chosenNode; } /*************************************************************** * Periodically check in with the namenode and renew all the leases * when the lease period is half over. ***************************************************************/ class LeaseChecker implements Runnable { /** */ public void run() { long lastRenewed = 0; while (running) { if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) { try { namenode.renewLease(clientName); lastRenewed = System.currentTimeMillis(); } catch (IOException ie) { } } try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } } /**************************************************************** * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ class DFSInputStream extends FSInputStream { private Socket s = null; boolean closed = false; private String src; private DataInputStream blockStream; private Block blocks[] = null; private DatanodeInfo nodes[][] = null; private long pos = 0; private long filelen = 0; private long blockEnd = -1; /** */ public DFSInputStream(String src) throws IOException { this.src = src; openInfo(); this.blockStream = null; for (int i = 0; i < blocks.length; i++) { this.filelen += blocks[i].getNumBytes(); } } /** * Grab the open-file info from namenode */ void openInfo() throws IOException { Block oldBlocks[] = this.blocks; LocatedBlock results[] = namenode.open(src); Vector blockV = new Vector(); Vector nodeV = new Vector(); for (int i = 0; i < results.length; i++) { blockV.add(results[i].getBlock()); nodeV.add(results[i].getLocations()); } Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]); if (oldBlocks != null) { for (int i = 0; i < oldBlocks.length; i++) { if (! oldBlocks[i].equals(newBlocks[i])) { throw new IOException("Blocklist for " + src + " has changed!"); } } if (oldBlocks.length != newBlocks.length) { throw new IOException("Blocklist for " + src + " now has different length"); } } this.blocks = newBlocks; this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]); } /** * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ private synchronized void blockSeekTo(long target) throws IOException { if (target >= filelen) { throw new IOException("Attempted to read past end of file"); } if (s != null) { s.close(); s = null; } // // Compute desired block // int targetBlock = -1; long targetBlockStart = 0; long targetBlockEnd = 0; for (int i = 0; i < blocks.length; i++) { long blocklen = blocks[i].getNumBytes(); targetBlockEnd = targetBlockStart + blocklen - 1; if (target >= targetBlockStart && target <= targetBlockEnd) { targetBlock = i; break; } else { targetBlockStart = targetBlockEnd + 1; } } if (targetBlock < 0) { throw new IOException("Impossible situation: could not find target position " + target); } long offsetIntoBlock = target - targetBlockStart; // // Connect to best DataNode for desired Block, with potential offset // int failures = 0; InetSocketAddress targetAddr = null; TreeSet deadNodes = new TreeSet(); while (s == null) { DatanodeInfo chosenNode; try { chosenNode = bestNode(nodes[targetBlock], deadNodes); targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString()); } catch (IOException ie) { String blockInfo = blocks[targetBlock]+" file="+src+" offset="+target; if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) { throw new IOException("Could not obtain block: " + blockInfo); } if (nodes[targetBlock] == null || nodes[targetBlock].length == 0) { LOG.info("No node available for block: " + blockInfo); } LOG.info("Could not obtain block from any node: " + ie); try { Thread.sleep(3000); } catch (InterruptedException iex) { } deadNodes.clear(); openInfo(); failures++; continue; } try { s = new Socket(); s.connect(targetAddr, READ_TIMEOUT); s.setSoTimeout(READ_TIMEOUT); // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(OP_READSKIP_BLOCK); blocks[targetBlock].write(out); out.writeLong(offsetIntoBlock); out.flush(); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); long curBlockSize = in.readLong(); long amtSkipped = in.readLong(); if (curBlockSize != blocks[targetBlock].len) { throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize); } if (amtSkipped != offsetIntoBlock) { throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped); } this.pos = target; this.blockEnd = targetBlockEnd; this.blockStream = in; } catch (IOException ex) { // Put chosen node into dead list, continue LOG.info("Failed to connect to " + targetAddr + ":" + ex); deadNodes.add(chosenNode); if (s != null) { try { s.close(); } catch (IOException iex) { } } s = null; } } } /** * Close it down! */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?