fsnamesystem.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 1,340 行 · 第 1/4 页
JAVA
1,340 行
// The 'excessblocks' tracks blocks until we get confirmation // that the datanode has deleted them; the only way we remove them // is when we get a "removeBlock" message. // // The 'invalidate' list is used to inform the datanode the block // should be deleted. Items are removed from the invalidate list // upon giving instructions to the namenode. // Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName()); if (invalidateSet == null) { invalidateSet = new Vector(); recentInvalidateSets.put(cur.getName(), invalidateSet); } invalidateSet.add(b); } } /** * Modify (block-->datanode) map. Possibly generate * replication tasks, if the removed block is still valid. */ synchronized void removeStoredBlock(Block block, DatanodeInfo node) { TreeSet containingNodes = (TreeSet) blocksMap.get(block); if (containingNodes == null || ! containingNodes.contains(node)) { throw new IllegalArgumentException("No machine mapping found for block " + block + ", which should be at node " + node); } containingNodes.remove(node); // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is // necessary. In that case, put block on a possibly-will- // be-replicated list. // if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) { synchronized (neededReplications) { neededReplications.add(block); } } // // We've removed a block from a node, so it's definitely no longer // in "excess" there. // TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName()); if (excessBlocks != null) { excessBlocks.remove(block); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getName()); } } } /** * The given node is reporting that it received a certain block. */ public synchronized void blockReceived(Block block, UTF8 name) { DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name); if (node == null) { throw new IllegalArgumentException("Unexpected exception. Got blockReceived message from node " + name + ", but there is no info for " + name); } // // Modify the blocks->datanode map // addStoredBlock(block, node); // // Supplement node's blockreport // node.addBlock(block); } /** * Total raw bytes */ public long totalCapacity() { return totalCapacity; } /** * Total non-used raw bytes */ public long totalRemaining() { return totalRemaining; } /** */ public DatanodeInfo[] datanodeReport() { DatanodeInfo results[] = null; synchronized (heartbeats) { synchronized (datanodeMap) { results = new DatanodeInfo[datanodeMap.size()]; int i = 0; for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { DatanodeInfo cur = (DatanodeInfo) it.next(); results[i++] = cur; } } } return results; } ///////////////////////////////////////////////////////// // // These methods are called by the Namenode system, to see // if there is any work for a given datanode. // ///////////////////////////////////////////////////////// /** * Check if there are any recently-deleted blocks a datanode should remove. */ public synchronized Block[] blocksToInvalidate(UTF8 sender) { Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender); if (invalidateSet != null) { return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]); } else { return null; } } /** * Return with a list of Block/DataNodeInfo sets, indicating * where various Blocks should be copied, ASAP. * * The Array that we return consists of two objects: * The 1st elt is an array of Blocks. * The 2nd elt is a 2D array of DatanodeInfo objs, identifying the * target sequence for the Block at the appropriate index. * */ public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) { synchronized (neededReplications) { Object results[] = null; int scheduledXfers = 0; if (neededReplications.size() > 0) { // // Go through all blocks that need replications. See if any // are present at the current node. If so, ask the node to // replicate them. // Vector replicateBlocks = new Vector(); Vector replicateTargetSets = new Vector(); for (Iterator it = neededReplications.iterator(); it.hasNext(); ) { // // We can only reply with 'maxXfers' or fewer blocks // if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) { break; } Block block = (Block) it.next(); if (! dir.isValidBlock(block)) { it.remove(); } else { TreeSet containingNodes = (TreeSet) blocksMap.get(block); if (containingNodes.contains(srcNode)) { DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null); if (targets.length > 0) { // Build items to return replicateBlocks.add(block); replicateTargetSets.add(targets); scheduledXfers += targets.length; } } } } // // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. // (REMIND - mjc - this timer is not yet implemented.) // if (replicateBlocks.size() > 0) { int i = 0; for (Iterator it = replicateBlocks.iterator(); it.hasNext(); i++) { Block block = (Block) it.next(); DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i); TreeSet containingNodes = (TreeSet) blocksMap.get(block); if (containingNodes.size() + targets.length >= this.desiredReplication) { neededReplications.remove(block); pendingReplications.add(block); } LOG.info("Pending transfer (block " + block.getBlockName() + ") from " + srcNode.getName() + " to " + targets.length + " destinations"); } // // Build returned objects from above lists // DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][]; for (i = 0; i < targetMatrix.length; i++) { targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i); } results = new Object[2]; results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]); results[1] = targetMatrix; } } return results; } } /** * Get a certain number of targets, if possible. * If not, return as many as we can. * @param desiredReplicates number of duplicates wanted. * @param forbiddenNodes of DatanodeInfo instances that should not be * considered targets. * @return array of DatanodeInfo instances uses as targets. */ DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) { TreeSet alreadyChosen = new TreeSet(); Vector targets = new Vector(); for (int i = 0; i < desiredReplicates; i++) { DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine); if (target != null) { targets.add(target); alreadyChosen.add(target); } else { break; // calling chooseTarget again won't help } } return (DatanodeInfo[]) targets.toArray(new DatanodeInfo[targets.size()]); } /** * Choose a target from available machines, excepting the * given ones. * * Right now it chooses randomly from available boxes. In future could * choose according to capacity and load-balancing needs (or even * network-topology, to avoid inter-switch traffic). * @param forbidden1 DatanodeInfo targets not allowed, null allowed. * @param forbidden2 DatanodeInfo targets not allowed, null allowed. * @return DatanodeInfo instance to use or null if something went wrong * (a log message is emitted if null is returned). */ DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 clientMachine) { // // Check if there are any available targets at all // int totalMachines = datanodeMap.size(); if (totalMachines == 0) { LOG.warning("While choosing target, totalMachines is " + totalMachines); return null; } // // Build a map of forbidden hostnames from the two forbidden sets. // TreeSet forbiddenMachines = new TreeSet(); if (forbidden1 != null) { for (Iterator it = forbidden1.iterator(); it.hasNext(); ) { DatanodeInfo cur = (DatanodeInfo) it.next(); forbiddenMachines.add(cur.getHost()); } } if (forbidden2 != null) { for (Iterator it = forbidden2.iterator(); it.hasNext(); ) { DatanodeInfo cur = (DatanodeInfo) it.next(); forbiddenMachines.add(cur.getHost()); } } // // Build list of machines we can actually choose from // Vector targetList = new Vector(); for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); if (! forbiddenMachines.contains(node.getHost())) { targetList.add(node); } } Collections.shuffle(targetList); // // Now pick one // if (targetList.size() > 0) { // // If the requester's machine is in the targetList, // and it's got the capacity, pick it. // if (clientMachine != null && clientMachine.getLength() > 0) { for (Iterator it = targetList.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); if (clientMachine.equals(node.getHost())) { if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) { return node; } } } } // // Otherwise, choose node according to target capacity // for (Iterator it = targetList.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) { return node; } } // // That should do the trick. But we might not be able // to pick any node if the target was out of bytes. As // a last resort, pick the first valid one we can find. // for (Iterator it = targetList.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); if (node.getRemaining() > BLOCK_SIZE) { return node; } } LOG.warning("Could not find any nodes with sufficient capacity"); return null; } else { LOG.warning("Zero targets found, forbidden1.size=" + ( forbidden1 != null ? forbidden1.size() : 0 ) + " forbidden2.size()=" + ( forbidden2 != null ? forbidden2.size() : 0 )); return null; } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?