fsnamesystem.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 1,340 行 · 第 1/4 页

JAVA
1,340
字号
     ******************************************************/    class LeaseMonitor implements Runnable {        public void run() {            while (fsRunning) {                synchronized (FSNamesystem.this) {                    synchronized (leases) {                        Lease top;                        while ((sortedLeases.size() > 0) &&                               ((top = (Lease) sortedLeases.first()) != null)) {                            if (top.expired()) {                                top.releaseLocks();                                leases.remove(top.holder);                                LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());                                if (!sortedLeases.remove(top)) {                                    LOG.info("Unknown failure trying to remove " + top + " from lease set.");                                }                            } else {                                break;                            }                        }                    }                }                try {                    Thread.sleep(2000);                } catch (InterruptedException ie) {                }            }        }    }    /**     * Get a lock (perhaps exclusive) on the given file     */    public synchronized int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {        int result = dir.obtainLock(src, holder, exclusive);        if (result == COMPLETE_SUCCESS) {            synchronized (leases) {                Lease lease = (Lease) leases.get(holder);                if (lease == null) {                    lease = new Lease(holder);                    leases.put(holder, lease);                    sortedLeases.add(lease);                } else {                    sortedLeases.remove(lease);                    lease.renew();                    sortedLeases.add(lease);                }                lease.obtained(src);            }        }        return result;    }    /**     * Release the lock on the given file     */    public synchronized int releaseLock(UTF8 src, UTF8 holder) {        int result = internalReleaseLock(src, holder);        if (result == COMPLETE_SUCCESS) {            synchronized (leases) {                Lease lease = (Lease) leases.get(holder);                if (lease != null) {                    lease.released(src);                    if (! lease.hasLocks()) {                        leases.remove(holder);                        sortedLeases.remove(lease);                    }                }            }        }        return result;    }    private int internalReleaseLock(UTF8 src, UTF8 holder) {        return dir.releaseLock(src, holder);    }    private void internalReleaseCreate(UTF8 src) {        Vector v = (Vector) pendingCreates.remove(src);        for (Iterator it2 = v.iterator(); it2.hasNext(); ) {            Block b = (Block) it2.next();            pendingCreateBlocks.remove(b);        }    }    /**     * Renew the lease(s) held by the given client     */    public void renewLease(UTF8 holder) {        synchronized (leases) {            Lease lease = (Lease) leases.get(holder);            if (lease != null) {                sortedLeases.remove(lease);                lease.renew();                sortedLeases.add(lease);            }        }    }    /**     * Get a listing of all files at 'src'.  The Object[] array     * exists so we can return file attributes (soon to be implemented)     */    public DFSFileInfo[] getListing(UTF8 src) {        return dir.getListing(src);    }    /////////////////////////////////////////////////////////    //    // These methods are called by datanodes    //    /////////////////////////////////////////////////////////    /**     * The given node has reported in.  This method should:     * 1) Record the heartbeat, so the datanode isn't timed out     * 2) Adjust usage stats for future block allocation     */    public synchronized void gotHeartbeat(UTF8 name, long capacity, long remaining) {        synchronized (heartbeats) {            synchronized (datanodeMap) {                long capacityDiff = 0;                long remainingDiff = 0;                DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);                if (nodeinfo == null) {                    LOG.info("Got brand-new heartbeat from " + name);                    nodeinfo = new DatanodeInfo(name, capacity, remaining);                    datanodeMap.put(name, nodeinfo);                    capacityDiff = capacity;                    remainingDiff = remaining;                } else {                    capacityDiff = capacity - nodeinfo.getCapacity();                    remainingDiff = remaining - nodeinfo.getRemaining();                    heartbeats.remove(nodeinfo);                    nodeinfo.updateHeartbeat(capacity, remaining);                }                heartbeats.add(nodeinfo);                totalCapacity += capacityDiff;                totalRemaining += remainingDiff;            }        }    }    /**     * Periodically calls heartbeatCheck().     */    class HeartbeatMonitor implements Runnable {        /**         */        public void run() {            while (fsRunning) {                heartbeatCheck();                try {                    Thread.sleep(heartBeatRecheck);                } catch (InterruptedException ie) {                }            }        }    }    /**     * Check if there are any expired heartbeats, and if so,     * whether any blocks have to be re-replicated.     */    synchronized void heartbeatCheck() {        synchronized (heartbeats) {            DatanodeInfo nodeInfo = null;            while ((heartbeats.size() > 0) &&                   ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&                   (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {                LOG.info("Lost heartbeat for " + nodeInfo.getName());                heartbeats.remove(nodeInfo);                synchronized (datanodeMap) {                    datanodeMap.remove(nodeInfo.getName());                }                totalCapacity -= nodeInfo.getCapacity();                totalRemaining -= nodeInfo.getRemaining();                Block deadblocks[] = nodeInfo.getBlocks();                if (deadblocks != null) {                    for (int i = 0; i < deadblocks.length; i++) {                        removeStoredBlock(deadblocks[i], nodeInfo);                    }                }                if (heartbeats.size() > 0) {                    nodeInfo = (DatanodeInfo) heartbeats.first();                }            }        }    }        /**     * The given node is reporting all its blocks.  Use this info to      * update the (machine-->blocklist) and (block-->machinelist) tables.     */    public synchronized Block[] processReport(Block newReport[], UTF8 name) {        DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);        if (node == null) {            throw new IllegalArgumentException("Unexpected exception.  Received block report from node " + name + ", but there is no info for " + name);        }        //        // Modify the (block-->datanode) map, according to the difference        // between the old and new block report.        //        int oldPos = 0, newPos = 0;        Block oldReport[] = node.getBlocks();        while (oldReport != null && newReport != null && oldPos < oldReport.length && newPos < newReport.length) {            int cmp = oldReport[oldPos].compareTo(newReport[newPos]);                        if (cmp == 0) {                // Do nothing, blocks are the same                oldPos++;                newPos++;            } else if (cmp < 0) {                // The old report has a block the new one does not                removeStoredBlock(oldReport[oldPos], node);                oldPos++;            } else {                // The new report has a block the old one does not                addStoredBlock(newReport[newPos], node);                newPos++;            }        }        while (oldReport != null && oldPos < oldReport.length) {            // The old report has a block the new one does not            removeStoredBlock(oldReport[oldPos], node);            oldPos++;        }        while (newReport != null && newPos < newReport.length) {            // The new report has a block the old one does not            addStoredBlock(newReport[newPos], node);            newPos++;        }        //        // Modify node so it has the new blockreport        //        node.updateBlocks(newReport);        //        // We've now completely updated the node's block report profile.        // We now go through all its blocks and find which ones are invalid,        // no longer pending, or over-replicated.        //        // (Note it's not enough to just invalidate blocks at lease expiry         // time; datanodes can go down before the client's lease on         // the failed file expires and miss the "expire" event.)        //        // This function considers every block on a datanode, and thus        // should only be invoked infrequently.        //        Vector obsolete = new Vector();        for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {            Block b = (Block) it.next();            if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {                LOG.info("Obsoleting block " + b);                obsolete.add(b);            }        }        return (Block[]) obsolete.toArray(new Block[obsolete.size()]);    }    /**     * Modify (block-->datanode) map.  Remove block from set of      * needed replications if this takes care of the problem.     */    synchronized void addStoredBlock(Block block, DatanodeInfo node) {        TreeSet containingNodes = (TreeSet) blocksMap.get(block);        if (containingNodes == null) {            containingNodes = new TreeSet();            blocksMap.put(block, containingNodes);        }        if (! containingNodes.contains(node)) {            containingNodes.add(node);        } else {            LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);        }        synchronized (neededReplications) {            if (dir.isValidBlock(block)) {                if (containingNodes.size() >= this.desiredReplication) {                    neededReplications.remove(block);                    pendingReplications.remove(block);                } else if (containingNodes.size() < this.desiredReplication) {                    if (! neededReplications.contains(block)) {                        neededReplications.add(block);                    }                }                //                // Find how many of the containing nodes are "extra", if any.                // If there are any extras, call chooseExcessReplicates() to                // mark them in the excessReplicateMap.                //                Vector nonExcess = new Vector();                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {                    DatanodeInfo cur = (DatanodeInfo) it.next();                    TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());                    if (excessBlocks == null || ! excessBlocks.contains(block)) {                        nonExcess.add(cur);                    }                }                if (nonExcess.size() > this.maxReplication) {                    chooseExcessReplicates(nonExcess, block, this.maxReplication);                    }            }        }    }    /**     * We want a max of "maxReps" replicates for any block, but we now have too many.       * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:     *     * srcNodes.size() - dstNodes.size() == maxReps     *     * For now, we choose nodes randomly.  In the future, we might enforce some     * kind of policy (like making sure replicates are spread across racks).     */    void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {        while (nonExcess.size() - maxReps > 0) {            int chosenNode = r.nextInt(nonExcess.size());            DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);            nonExcess.removeElementAt(chosenNode);            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());            if (excessBlocks == null) {                excessBlocks = new TreeSet();                excessReplicateMap.put(cur.getName(), excessBlocks);            }            excessBlocks.add(b);            //

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?