⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fsnamesystem.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        while (oldReport != null && newReport != null && oldPos < oldReport.length && newPos < newReport.length) {            int cmp = oldReport[oldPos].compareTo(newReport[newPos]);                        if (cmp == 0) {                // Do nothing, blocks are the same                oldPos++;                newPos++;            } else if (cmp < 0) {                // The old report has a block the new one does not                removeStoredBlock(oldReport[oldPos], node);                oldPos++;            } else {                // The new report has a block the old one does not                addStoredBlock(newReport[newPos], node);                newPos++;            }        }        while (oldReport != null && oldPos < oldReport.length) {            // The old report has a block the new one does not            removeStoredBlock(oldReport[oldPos], node);            oldPos++;        }        while (newReport != null && newPos < newReport.length) {            // The new report has a block the old one does not            addStoredBlock(newReport[newPos], node);            newPos++;        }        //        // Modify node so it has the new blockreport        //        node.updateBlocks(newReport);        //        // We've now completely updated the node's block report profile.        // We now go through all its blocks and find which ones are invalid,        // no longer pending, or over-replicated.        //        // (Note it's not enough to just invalidate blocks at lease expiry         // time; datanodes can go down before the client's lease on         // the failed file expires and miss the "expire" event.)        //        // This function considers every block on a datanode, and thus        // should only be invoked infrequently.        //        Vector obsolete = new Vector();        for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {            Block b = (Block) it.next();            if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {                obsolete.add(b);                NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "                        +"ask "+nodeID.getName()+" to delete "+b.getBlockName() );            }        }        return (Block[]) obsolete.toArray(new Block[obsolete.size()]);    }    /**     * Modify (block-->datanode) map.  Remove block from set of      * needed replications if this takes care of the problem.     */    synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {        TreeSet containingNodes = (TreeSet) blocksMap.get(block);        if (containingNodes == null) {            containingNodes = new TreeSet();            blocksMap.put(block, containingNodes);        }        if (! containingNodes.contains(node)) {            containingNodes.add(node);            //             // Hairong: I would prefer to set the level of next logrecord            // to be debug.            // But at startup time, because too many new blocks come in            // they simply take up all the space in the log file             // So I set the level to be trace            //            NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "                    +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() );        } else {            NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "                    + "Redundant addStoredBlock request received for "                     + block.getBlockName() + " on " + node.getName());        }        synchronized (neededReplications) {            FSDirectory.INode fileINode = dir.getFileByBlock(block);            if( fileINode == null )  // block does not belong to any file                return;            // check whether safe replication is reached for the block            // only if it is a part of a files            incrementSafeBlockCount( containingNodes.size() );            short fileReplication = fileINode.getReplication();            if (containingNodes.size() >= fileReplication ) {                neededReplications.remove(block);                pendingReplications.remove(block);                NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "                        +block.getBlockName()+" has "+containingNodes.size()                        +" replicas so is removed from neededReplications and pendingReplications" );            } else {// containingNodes.size() < fileReplication                neededReplications.add(block);                NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "                    +block.getBlockName()+" has only "+containingNodes.size()                    +" replicas so is added to neededReplications" );            }            proccessOverReplicatedBlock( block, fileReplication );        }    }        /**     * Find how many of the containing nodes are "extra", if any.     * If there are any extras, call chooseExcessReplicates() to     * mark them in the excessReplicateMap.     */    private void proccessOverReplicatedBlock( Block block, short replication ) {      TreeSet containingNodes = (TreeSet) blocksMap.get(block);      if( containingNodes == null )        return;      Vector nonExcess = new Vector();      for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {          DatanodeDescriptor cur = (DatanodeDescriptor) it.next();          TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getStorageID());          if (excessBlocks == null || ! excessBlocks.contains(block)) {              nonExcess.add(cur);          }      }      chooseExcessReplicates(nonExcess, block, replication);        }    /**     * We want "replication" replicates for the block, but we now have too many.       * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:     *     * srcNodes.size() - dstNodes.size() == replication     *     * We pick node with least free space     * In the future, we might enforce some kind of policy      * (like making sure replicates are spread across racks).     */    void chooseExcessReplicates(Vector nonExcess, Block b, short replication) {        while (nonExcess.size() - replication > 0) {            DatanodeInfo cur = null;            long minSpace = Long.MAX_VALUE;                        for (Iterator iter = nonExcess.iterator(); iter.hasNext();) {                DatanodeInfo node = (DatanodeInfo) iter.next();                long free = node.getRemaining();                                if(minSpace > free) {                    minSpace = free;                    cur = node;                }            }                        nonExcess.remove(cur);            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getStorageID());            if (excessBlocks == null) {                excessBlocks = new TreeSet();                excessReplicateMap.put(cur.getStorageID(), excessBlocks);            }            excessBlocks.add(b);            NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "                    +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" );            //            // The 'excessblocks' tracks blocks until we get confirmation            // that the datanode has deleted them; the only way we remove them            // is when we get a "removeBlock" message.              //            // The 'invalidate' list is used to inform the datanode the block             // should be deleted.  Items are removed from the invalidate list            // upon giving instructions to the namenode.            //            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getStorageID());            if (invalidateSet == null) {                invalidateSet = new Vector();                recentInvalidateSets.put(cur.getStorageID(), invalidateSet);            }            invalidateSet.add(b);            NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "                    +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" );        }    }    /**     * Modify (block-->datanode) map.  Possibly generate      * replication tasks, if the removed block is still valid.     */    synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {        NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "                +block.getBlockName() + " from "+node.getName() );        TreeSet containingNodes = (TreeSet) blocksMap.get(block);        if (containingNodes == null || ! containingNodes.contains(node)) {          NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "            +block.getBlockName()+" has already been removed from node "+node );          return;        }        containingNodes.remove(node);        decrementSafeBlockCount( containingNodes.size() );        if( containingNodes.size() == 0 )          blocksMap.remove(block);        //        // It's possible that the block was removed because of a datanode        // failure.  If the block is still valid, check if replication is        // necessary.  In that case, put block on a possibly-will-        // be-replicated list.        //        FSDirectory.INode fileINode = dir.getFileByBlock(block);        if( fileINode != null && (containingNodes.size() < fileINode.getReplication())) {            synchronized (neededReplications) {                neededReplications.add(block);            }            NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "                    +block.getBlockName()+" has only "+containingNodes.size()                    +" replicas so is added to neededReplications" );        }        //        // We've removed a block from a node, so it's definitely no longer        // in "excess" there.        //        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getStorageID());        if (excessBlocks != null) {            excessBlocks.remove(block);            NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "                    +block.getBlockName()+" is removed from excessBlocks" );            if (excessBlocks.size() == 0) {                excessReplicateMap.remove(node.getStorageID());            }        }    }    /**     * The given node is reporting that it received a certain block.     */    public synchronized void blockReceived( DatanodeID nodeID,                                              Block block                                          ) throws IOException {        DatanodeDescriptor node = getDatanode( nodeID );        if (node == null) {            NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "             + block.getBlockName() + " is received from an unrecorded node "              + nodeID.getName() );            throw new IllegalArgumentException(                "Unexpected exception.  Got blockReceived message from node "                 + block.getBlockName() + ", but there is no info for it");        }        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "                +block.getBlockName()+" is received from " + nodeID.getName() );        //        // Modify the blocks->datanode map        //         addStoredBlock(block, node);        //        // Supplement node's blockreport        //        node.addBlock(block);    }    /**     * Total raw bytes     */    public long totalCapacity() {        return totalCapacity;    }    /**     * Total non-used raw bytes     */    public long totalRemaining() {        return totalRemaining;    }    /**     */    public DatanodeInfo[] datanodeReport() {      DatanodeInfo results[] = null;        synchronized (heartbeats) {          synchronized (datanodeMap) {            results = new DatanodeInfo[datanodeMap.size()];            int i = 0;            for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); )              results[i++] = new DatanodeInfo( (DatanodeDescriptor)it.next() );          }        }        return results;    }        /**     */    public void DFSNodesStatus(Vector live, Vector dead) {      synchronized (heartbeats) {        synchronized (datanodeMap) {          for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {            DatanodeDescriptor node = (DatanodeDescriptor)it.next();            if( node.isDead() )              dead.add( node );            else              live.add( node );          }        }      }    }    /**      */    public DatanodeInfo getDataNodeInfo(String name) {        return (DatanodeDescriptor)datanodeMap.get(name);    }    /**      */    public String getDFSNameNodeMachine() {        return localMachine;    }    /**     */     public int getDFSNameNodePort() {        return port;    }    /**     */    public Date getStartTime() {        return startTime;    }    /////////////////////////////////////////////////////////    //    // These methods are called by the Namenode system, to see    // if there is any work for a given datanode.    //    /////////////////////////////////////////////////////////    /**     * Check if there are any recently-deleted blocks a datanode should remove.     */    public synchronized Block[] blocksToInvalidate( DatanodeID nodeID ) {        // Ask datanodes to perform block delete          // only if safe mode is off.        if( isInSafeMode() )          return null;                Vector invalidateSet = 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -