📄 dfsclient.java
字号:
break; } } if (chosenNode == null) { do { chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length]; } while (deadNodes.contains(chosenNode)); } return chosenNode; } /*************************************************************** * Periodically check in with the namenode and renew all the leases * when the lease period is half over. ***************************************************************/ class LeaseChecker implements Runnable { /** */ public void run() { long lastRenewed = 0; while (running) { if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) { try { if( pendingCreates.size() > 0 ) namenode.renewLease(clientName); lastRenewed = System.currentTimeMillis(); } catch (IOException ie) { String err = StringUtils.stringifyException(ie); LOG.warn("Problem renewing lease for " + clientName + ": " + err); } } try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } } /** Utility class to encapsulate data node info and its ip address. */ private static class DNAddrPair { DatanodeInfo info; InetSocketAddress addr; DNAddrPair(DatanodeInfo info, InetSocketAddress addr) { this.info = info; this.addr = addr; } } /**************************************************************** * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ class DFSInputStream extends FSInputStream { private Socket s = null; boolean closed = false; private String src; private DataInputStream blockStream; private Block blocks[] = null; private DatanodeInfo nodes[][] = null; private long pos = 0; private long filelen = 0; private long blockEnd = -1; /** */ public DFSInputStream(String src) throws IOException { this.src = src; openInfo(); this.blockStream = null; for (int i = 0; i < blocks.length; i++) { this.filelen += blocks[i].getNumBytes(); } } /** * Grab the open-file info from namenode */ synchronized void openInfo() throws IOException { Block oldBlocks[] = this.blocks; LocatedBlock results[] = namenode.open(src); Vector blockV = new Vector(); Vector nodeV = new Vector(); for (int i = 0; i < results.length; i++) { blockV.add(results[i].getBlock()); nodeV.add(results[i].getLocations()); } Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]); if (oldBlocks != null) { for (int i = 0; i < oldBlocks.length; i++) { if (! oldBlocks[i].equals(newBlocks[i])) { throw new IOException("Blocklist for " + src + " has changed!"); } } if (oldBlocks.length != newBlocks.length) { throw new IOException("Blocklist for " + src + " now has different length"); } } this.blocks = newBlocks; this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]); } /** * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ private synchronized void blockSeekTo(long target) throws IOException { if (target >= filelen) { throw new IOException("Attempted to read past end of file"); } if (s != null) { s.close(); s = null; } // // Compute desired block // int targetBlock = -1; long targetBlockStart = 0; long targetBlockEnd = 0; for (int i = 0; i < blocks.length; i++) { long blocklen = blocks[i].getNumBytes(); targetBlockEnd = targetBlockStart + blocklen - 1; if (target >= targetBlockStart && target <= targetBlockEnd) { targetBlock = i; break; } else { targetBlockStart = targetBlockEnd + 1; } } if (targetBlock < 0) { throw new IOException("Impossible situation: could not find target position " + target); } long offsetIntoBlock = target - targetBlockStart; // // Connect to best DataNode for desired Block, with potential offset // int failures = 0; TreeSet deadNodes = new TreeSet(); while (s == null) { DNAddrPair retval = chooseDataNode(targetBlock, deadNodes); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; try { s = new Socket(); s.connect(targetAddr, READ_TIMEOUT); s.setSoTimeout(READ_TIMEOUT); // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(OP_READSKIP_BLOCK); blocks[targetBlock].write(out); out.writeLong(offsetIntoBlock); out.flush(); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); long curBlockSize = in.readLong(); long amtSkipped = in.readLong(); if (curBlockSize != blocks[targetBlock].len) { throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize); } if (amtSkipped != offsetIntoBlock) { throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped); } this.pos = target; this.blockEnd = targetBlockEnd; this.blockStream = in; } catch (IOException ex) { // Put chosen node into dead list, continue LOG.info("Failed to connect to " + targetAddr + ":" + ex); deadNodes.add(chosenNode); if (s != null) { try { s.close(); } catch (IOException iex) { } } s = null; } } } /** * Close it down! */ public synchronized void close() throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } if (s != null) { blockStream.close(); s.close(); s = null; } super.close(); closed = true; } /** * Basic read() */ public synchronized int read() throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } int result = -1; if (pos < filelen) { if (pos > blockEnd) { blockSeekTo(pos); } result = blockStream.read(); if (result >= 0) { pos++; } } return result; } /** * Read the entire buffer. */ public synchronized int read(byte buf[], int off, int len) throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } if (pos < filelen) { int retries = 2; while (retries > 0) { try { if (pos > blockEnd) { blockSeekTo(pos); } int realLen = Math.min(len, (int) (blockEnd - pos + 1)); int result = blockStream.read(buf, off, realLen); if (result >= 0) { pos += result; } return result; } catch (IOException e) { LOG.warn("DFS Read: " + StringUtils.stringifyException(e)); blockEnd = -1; if (--retries == 0) { throw e; } } } } return -1; } private DNAddrPair chooseDataNode(int blockId, TreeSet deadNodes) throws IOException { int failures = 0; while (true) { try { DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes); InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName()); return new DNAddrPair(chosenNode, targetAddr); } catch (IOException ie) { String blockInfo = blocks[blockId]+" file="+src; if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) { throw new IOException("Could not obtain block: " + blockInfo); } if (nodes[blockId] == null || nodes[blockId].length == 0) { LOG.info("No node available for block: " + blockInfo); } LOG.info("Could not obtain block from any node: " + ie); try { Thread.sleep(3000); } catch (InterruptedException iex) { } deadNodes.clear(); openInfo(); failures++; continue; } } } private void fetchBlockByteRange(int blockId, long start, long end, byte[] buf, int offset) throws IOException { // // Connect to best DataNode for desired Block, with potential offset // TreeSet deadNodes = new TreeSet(); Socket dn = null; while (dn == null) { DNAddrPair retval = chooseDataNode(blockId, deadNodes); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; try { dn = new Socket(); dn.connect(targetAddr, READ_TIMEOUT); dn.setSoTimeout(READ_TIMEOUT); // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream())); out.write(OP_READ_RANGE_BLOCK); blocks[blockId].write(out); out.writeLong(start); out.writeLong(end); out.flush(); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream())); long curBlockSize = in.readLong(); long actualStart = in.readLong(); long actualEnd = in.readLong(); if (curBlockSize != blocks[blockId].len) { throw new IOException("Recorded block size is " + blocks[blockId].len + ", but datanode reports size of " + curBlockSize); } if ((actualStart != start) || (actualEnd != end)) { throw new IOException("Asked for byte range " + start + "-" + end + ", but only received range " + actualStart + "-" + actualEnd); } int nread = in.read(buf, offset, (int)(end - start + 1)); } catch (IOException ex) { // Put chosen node into dead list, continue LOG.info("Failed to connect to " + targetAddr + ":" + ex); deadNodes.add(chosenNode); if (dn != null) { try { dn.close(); } catch (IOException iex) { } } dn = null; } } } public int read(long position, byte[] buf, int off, int len) throws IOException { // sanity checks checkOpen(); if (closed) { throw new IOException("Stream closed"); } if ((position < 0) || (position > filelen)) { return -1; } int realLen = len; if ((position + len) > filelen) { realLen = (int)(filelen - position); } // determine the block and byte range within the block // corresponding to position and realLen int targetBlock = -1; long targetStart = 0; long targetEnd = 0; for (int idx = 0; idx < blocks.length; idx++) { long blocklen = blocks[idx].getNumBytes(); targetEnd = targetStart + blocklen - 1; if (position >= targetStart && position <= targetEnd) { targetBlock = idx; targetStart = position - targetStart; targetEnd = Math.min(blocklen, targetStart + realLen) - 1; realLen = (int)(targetEnd - targetStart + 1); break; } targetStart += blocklen; } if (targetBlock < 0) { throw new IOException( "Impossible situation: could not find target position "+ position); } fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off); return realLen; } /** * Seek to a new arbitrary location */ public synchronized void seek(long targetPos) throws IOException { if (targetPos > filelen) { throw new IOException("Cannot seek after EOF"); } pos = targetPos; blockEnd = -1; } /** */ public synchronized long getPos() throws IOException { return pos; } /** */ public synchronized int available() throws IOException { if (closed) { throw new IOException("Stream closed"); } return (int) (filelen - pos); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -