fsnamesystem.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 1,340 行 · 第 1/4 页

JAVA
1,340
字号
            // The 'excessblocks' tracks blocks until we get confirmation            // that the datanode has deleted them; the only way we remove them            // is when we get a "removeBlock" message.              //            // The 'invalidate' list is used to inform the datanode the block             // should be deleted.  Items are removed from the invalidate list            // upon giving instructions to the namenode.            //            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());            if (invalidateSet == null) {                invalidateSet = new Vector();                recentInvalidateSets.put(cur.getName(), invalidateSet);            }            invalidateSet.add(b);        }    }    /**     * Modify (block-->datanode) map.  Possibly generate      * replication tasks, if the removed block is still valid.     */    synchronized void removeStoredBlock(Block block, DatanodeInfo node) {        TreeSet containingNodes = (TreeSet) blocksMap.get(block);        if (containingNodes == null || ! containingNodes.contains(node)) {            throw new IllegalArgumentException("No machine mapping found for block " + block + ", which should be at node " + node);        }        containingNodes.remove(node);        //        // It's possible that the block was removed because of a datanode        // failure.  If the block is still valid, check if replication is        // necessary.  In that case, put block on a possibly-will-        // be-replicated list.        //        if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) {            synchronized (neededReplications) {                neededReplications.add(block);            }        }        //        // We've removed a block from a node, so it's definitely no longer        // in "excess" there.        //        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());        if (excessBlocks != null) {            excessBlocks.remove(block);            if (excessBlocks.size() == 0) {                excessReplicateMap.remove(node.getName());            }        }    }    /**     * The given node is reporting that it received a certain block.     */    public synchronized void blockReceived(Block block, UTF8 name) {        DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);        if (node == null) {            throw new IllegalArgumentException("Unexpected exception.  Got blockReceived message from node " + name + ", but there is no info for " + name);        }        //        // Modify the blocks->datanode map        //         addStoredBlock(block, node);        //        // Supplement node's blockreport        //        node.addBlock(block);    }    /**     * Total raw bytes     */    public long totalCapacity() {        return totalCapacity;    }    /**     * Total non-used raw bytes     */    public long totalRemaining() {        return totalRemaining;    }    /**     */    public DatanodeInfo[] datanodeReport() {        DatanodeInfo results[] = null;        synchronized (heartbeats) {            synchronized (datanodeMap) {                results = new DatanodeInfo[datanodeMap.size()];                int i = 0;                for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {                    DatanodeInfo cur = (DatanodeInfo) it.next();                    results[i++] = cur;                }            }        }        return results;    }    /////////////////////////////////////////////////////////    //    // These methods are called by the Namenode system, to see    // if there is any work for a given datanode.    //    /////////////////////////////////////////////////////////    /**     * Check if there are any recently-deleted blocks a datanode should remove.     */    public synchronized Block[] blocksToInvalidate(UTF8 sender) {        Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);        if (invalidateSet != null) {            return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);        } else {            return null;        }    }    /**     * Return with a list of Block/DataNodeInfo sets, indicating     * where various Blocks should be copied, ASAP.     *     * The Array that we return consists of two objects:     * The 1st elt is an array of Blocks.     * The 2nd elt is a 2D array of DatanodeInfo objs, identifying the     *     target sequence for the Block at the appropriate index.     *     */    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) {        synchronized (neededReplications) {            Object results[] = null;	    int scheduledXfers = 0;            if (neededReplications.size() > 0) {                //                // Go through all blocks that need replications.  See if any                // are present at the current node.  If so, ask the node to                // replicate them.                //                Vector replicateBlocks = new Vector();                Vector replicateTargetSets = new Vector();                for (Iterator it = neededReplications.iterator(); it.hasNext(); ) {                    //                    // We can only reply with 'maxXfers' or fewer blocks                    //                    if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) {                        break;                    }                    Block block = (Block) it.next();                    if (! dir.isValidBlock(block)) {                        it.remove();                    } else {                        TreeSet containingNodes = (TreeSet) blocksMap.get(block);                        if (containingNodes.contains(srcNode)) {                            DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);                            if (targets.length > 0) {                                // Build items to return                                replicateBlocks.add(block);                                replicateTargetSets.add(targets);				scheduledXfers += targets.length;                            }                        }                    }                }                //                // Move the block-replication into a "pending" state.                // The reason we use 'pending' is so we can retry                // replications that fail after an appropriate amount of time.                  // (REMIND - mjc - this timer is not yet implemented.)                //                if (replicateBlocks.size() > 0) {                    int i = 0;                    for (Iterator it = replicateBlocks.iterator(); it.hasNext(); i++) {                        Block block = (Block) it.next();                        DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);                        TreeSet containingNodes = (TreeSet) blocksMap.get(block);                        if (containingNodes.size() + targets.length >= this.desiredReplication) {                            neededReplications.remove(block);                            pendingReplications.add(block);                        }			LOG.info("Pending transfer (block " + block.getBlockName() + ") from " + srcNode.getName() + " to " + targets.length + " destinations");                    }                    //                    // Build returned objects from above lists                    //                    DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][];                    for (i = 0; i < targetMatrix.length; i++) {                        targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);                    }                    results = new Object[2];                    results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);                    results[1]  = targetMatrix;                }            }            return results;        }    }    /**     * Get a certain number of targets, if possible.     * If not, return as many as we can.     * @param desiredReplicates number of duplicates wanted.     * @param forbiddenNodes of DatanodeInfo instances that should not be     * considered targets.     * @return array of DatanodeInfo instances uses as targets.     */    DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) {        TreeSet alreadyChosen = new TreeSet();        Vector targets = new Vector();        for (int i = 0; i < desiredReplicates; i++) {            DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine);            if (target != null) {                targets.add(target);                alreadyChosen.add(target);            } else {                break; // calling chooseTarget again won't help            }        }        return (DatanodeInfo[]) targets.toArray(new DatanodeInfo[targets.size()]);    }    /**     * Choose a target from available machines, excepting the     * given ones.     *     * Right now it chooses randomly from available boxes.  In future could      * choose according to capacity and load-balancing needs (or even      * network-topology, to avoid inter-switch traffic).     * @param forbidden1 DatanodeInfo targets not allowed, null allowed.     * @param forbidden2 DatanodeInfo targets not allowed, null allowed.     * @return DatanodeInfo instance to use or null if something went wrong     * (a log message is emitted if null is returned).     */    DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 clientMachine) {        //        // Check if there are any available targets at all        //        int totalMachines = datanodeMap.size();        if (totalMachines == 0) {            LOG.warning("While choosing target, totalMachines is " + totalMachines);            return null;        }        //        // Build a map of forbidden hostnames from the two forbidden sets.        //        TreeSet forbiddenMachines = new TreeSet();        if (forbidden1 != null) {            for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {                DatanodeInfo cur = (DatanodeInfo) it.next();                forbiddenMachines.add(cur.getHost());            }        }        if (forbidden2 != null) {            for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {                DatanodeInfo cur = (DatanodeInfo) it.next();                forbiddenMachines.add(cur.getHost());            }        }        //        // Build list of machines we can actually choose from        //        Vector targetList = new Vector();        for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {            DatanodeInfo node = (DatanodeInfo) it.next();            if (! forbiddenMachines.contains(node.getHost())) {                targetList.add(node);            }        }        Collections.shuffle(targetList);                //        // Now pick one        //        if (targetList.size() > 0) {            //            // If the requester's machine is in the targetList,             // and it's got the capacity, pick it.            //            if (clientMachine != null && clientMachine.getLength() > 0) {                for (Iterator it = targetList.iterator(); it.hasNext(); ) {                    DatanodeInfo node = (DatanodeInfo) it.next();                    if (clientMachine.equals(node.getHost())) {                        if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {                            return node;                        }                    }                }            }            //            // Otherwise, choose node according to target capacity            //            for (Iterator it = targetList.iterator(); it.hasNext(); ) {                DatanodeInfo node = (DatanodeInfo) it.next();                if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {                    return node;                }            }            //            // That should do the trick.  But we might not be able            // to pick any node if the target was out of bytes.  As            // a last resort, pick the first valid one we can find.            //            for (Iterator it = targetList.iterator(); it.hasNext(); ) {                DatanodeInfo node = (DatanodeInfo) it.next();                if (node.getRemaining() > BLOCK_SIZE) {                    return node;                }            }            LOG.warning("Could not find any nodes with sufficient capacity");            return null;        } else {            LOG.warning("Zero targets found, forbidden1.size=" +                ( forbidden1 != null ? forbidden1.size() : 0 ) +                " forbidden2.size()=" +                ( forbidden2 != null ? forbidden2.size() : 0 ));            return null;        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?