📄 fsnamesystem.java
字号:
while (oldReport != null && newReport != null && oldPos < oldReport.length && newPos < newReport.length) { int cmp = oldReport[oldPos].compareTo(newReport[newPos]); if (cmp == 0) { // Do nothing, blocks are the same oldPos++; newPos++; } else if (cmp < 0) { // The old report has a block the new one does not removeStoredBlock(oldReport[oldPos], node); oldPos++; } else { // The new report has a block the old one does not addStoredBlock(newReport[newPos], node); newPos++; } } while (oldReport != null && oldPos < oldReport.length) { // The old report has a block the new one does not removeStoredBlock(oldReport[oldPos], node); oldPos++; } while (newReport != null && newPos < newReport.length) { // The new report has a block the old one does not addStoredBlock(newReport[newPos], node); newPos++; } // // Modify node so it has the new blockreport // node.updateBlocks(newReport); // // We've now completely updated the node's block report profile. // We now go through all its blocks and find which ones are invalid, // no longer pending, or over-replicated. // // (Note it's not enough to just invalidate blocks at lease expiry // time; datanodes can go down before the client's lease on // the failed file expires and miss the "expire" event.) // // This function considers every block on a datanode, and thus // should only be invoked infrequently. // Vector obsolete = new Vector(); for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { Block b = (Block) it.next(); if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) { obsolete.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " +"ask "+nodeID.getName()+" to delete "+b.getBlockName() ); } } return (Block[]) obsolete.toArray(new Block[obsolete.size()]); } /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. */ synchronized void addStoredBlock(Block block, DatanodeDescriptor node) { TreeSet containingNodes = (TreeSet) blocksMap.get(block); if (containingNodes == null) { containingNodes = new TreeSet(); blocksMap.put(block, containingNodes); } if (! containingNodes.contains(node)) { containingNodes.add(node); // // Hairong: I would prefer to set the level of next logrecord // to be debug. // But at startup time, because too many new blocks come in // they simply take up all the space in the log file // So I set the level to be trace // NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: " +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() ); } else { NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: " + "Redundant addStoredBlock request received for " + block.getBlockName() + " on " + node.getName()); } synchronized (neededReplications) { FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) // block does not belong to any file return; // check whether safe replication is reached for the block // only if it is a part of a files incrementSafeBlockCount( containingNodes.size() ); short fileReplication = fileINode.getReplication(); if (containingNodes.size() >= fileReplication ) { neededReplications.remove(block); pendingReplications.remove(block); NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: " +block.getBlockName()+" has "+containingNodes.size() +" replicas so is removed from neededReplications and pendingReplications" ); } else {// containingNodes.size() < fileReplication neededReplications.add(block); NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: " +block.getBlockName()+" has only "+containingNodes.size() +" replicas so is added to neededReplications" ); } proccessOverReplicatedBlock( block, fileReplication ); } } /** * Find how many of the containing nodes are "extra", if any. * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ private void proccessOverReplicatedBlock( Block block, short replication ) { TreeSet containingNodes = (TreeSet) blocksMap.get(block); if( containingNodes == null ) return; Vector nonExcess = new Vector(); for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor cur = (DatanodeDescriptor) it.next(); TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getStorageID()); if (excessBlocks == null || ! excessBlocks.contains(block)) { nonExcess.add(cur); } } chooseExcessReplicates(nonExcess, block, replication); } /** * We want "replication" replicates for the block, but we now have too many. * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that: * * srcNodes.size() - dstNodes.size() == replication * * We pick node with least free space * In the future, we might enforce some kind of policy * (like making sure replicates are spread across racks). */ void chooseExcessReplicates(Vector nonExcess, Block b, short replication) { while (nonExcess.size() - replication > 0) { DatanodeInfo cur = null; long minSpace = Long.MAX_VALUE; for (Iterator iter = nonExcess.iterator(); iter.hasNext();) { DatanodeInfo node = (DatanodeInfo) iter.next(); long free = node.getRemaining(); if(minSpace > free) { minSpace = free; cur = node; } } nonExcess.remove(cur); TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getStorageID()); if (excessBlocks == null) { excessBlocks = new TreeSet(); excessReplicateMap.put(cur.getStorageID(), excessBlocks); } excessBlocks.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: " +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" ); // // The 'excessblocks' tracks blocks until we get confirmation // that the datanode has deleted them; the only way we remove them // is when we get a "removeBlock" message. // // The 'invalidate' list is used to inform the datanode the block // should be deleted. Items are removed from the invalidate list // upon giving instructions to the namenode. // Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getStorageID()); if (invalidateSet == null) { invalidateSet = new Vector(); recentInvalidateSets.put(cur.getStorageID(), invalidateSet); } invalidateSet.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: " +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" ); } } /** * Modify (block-->datanode) map. Possibly generate * replication tasks, if the removed block is still valid. */ synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName() + " from "+node.getName() ); TreeSet containingNodes = (TreeSet) blocksMap.get(block); if (containingNodes == null || ! containingNodes.contains(node)) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" has already been removed from node "+node ); return; } containingNodes.remove(node); decrementSafeBlockCount( containingNodes.size() ); if( containingNodes.size() == 0 ) blocksMap.remove(block); // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is // necessary. In that case, put block on a possibly-will- // be-replicated list. // FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode != null && (containingNodes.size() < fileINode.getReplication())) { synchronized (neededReplications) { neededReplications.add(block); } NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" has only "+containingNodes.size() +" replicas so is added to neededReplications" ); } // // We've removed a block from a node, so it's definitely no longer // in "excess" there. // TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getStorageID()); if (excessBlocks != null) { excessBlocks.remove(block); NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" is removed from excessBlocks" ); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getStorageID()); } } } /** * The given node is reporting that it received a certain block. */ public synchronized void blockReceived( DatanodeID nodeID, Block block ) throws IOException { DatanodeDescriptor node = getDatanode( nodeID ); if (node == null) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block.getBlockName() + " is received from an unrecorded node " + nodeID.getName() ); throw new IllegalArgumentException( "Unexpected exception. Got blockReceived message from node " + block.getBlockName() + ", but there is no info for it"); } NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block.getBlockName()+" is received from " + nodeID.getName() ); // // Modify the blocks->datanode map // addStoredBlock(block, node); // // Supplement node's blockreport // node.addBlock(block); } /** * Total raw bytes */ public long totalCapacity() { return totalCapacity; } /** * Total non-used raw bytes */ public long totalRemaining() { return totalRemaining; } /** */ public DatanodeInfo[] datanodeReport() { DatanodeInfo results[] = null; synchronized (heartbeats) { synchronized (datanodeMap) { results = new DatanodeInfo[datanodeMap.size()]; int i = 0; for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) results[i++] = new DatanodeInfo( (DatanodeDescriptor)it.next() ); } } return results; } /** */ public void DFSNodesStatus(Vector live, Vector dead) { synchronized (heartbeats) { synchronized (datanodeMap) { for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { DatanodeDescriptor node = (DatanodeDescriptor)it.next(); if( node.isDead() ) dead.add( node ); else live.add( node ); } } } } /** */ public DatanodeInfo getDataNodeInfo(String name) { return (DatanodeDescriptor)datanodeMap.get(name); } /** */ public String getDFSNameNodeMachine() { return localMachine; } /** */ public int getDFSNameNodePort() { return port; } /** */ public Date getStartTime() { return startTime; } ///////////////////////////////////////////////////////// // // These methods are called by the Namenode system, to see // if there is any work for a given datanode. // ///////////////////////////////////////////////////////// /** * Check if there are any recently-deleted blocks a datanode should remove. */ public synchronized Block[] blocksToInvalidate( DatanodeID nodeID ) { // Ask datanodes to perform block delete // only if safe mode is off. if( isInSafeMode() ) return null; Vector invalidateSet =
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -