fsnamesystem.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 1,340 行 · 第 1/4 页
JAVA
1,340 行
******************************************************/ class LeaseMonitor implements Runnable { public void run() { while (fsRunning) { synchronized (FSNamesystem.this) { synchronized (leases) { Lease top; while ((sortedLeases.size() > 0) && ((top = (Lease) sortedLeases.first()) != null)) { if (top.expired()) { top.releaseLocks(); leases.remove(top.holder); LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size()); if (!sortedLeases.remove(top)) { LOG.info("Unknown failure trying to remove " + top + " from lease set."); } } else { break; } } } } try { Thread.sleep(2000); } catch (InterruptedException ie) { } } } } /** * Get a lock (perhaps exclusive) on the given file */ public synchronized int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) { int result = dir.obtainLock(src, holder, exclusive); if (result == COMPLETE_SUCCESS) { synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease == null) { lease = new Lease(holder); leases.put(holder, lease); sortedLeases.add(lease); } else { sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } lease.obtained(src); } } return result; } /** * Release the lock on the given file */ public synchronized int releaseLock(UTF8 src, UTF8 holder) { int result = internalReleaseLock(src, holder); if (result == COMPLETE_SUCCESS) { synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease != null) { lease.released(src); if (! lease.hasLocks()) { leases.remove(holder); sortedLeases.remove(lease); } } } } return result; } private int internalReleaseLock(UTF8 src, UTF8 holder) { return dir.releaseLock(src, holder); } private void internalReleaseCreate(UTF8 src) { Vector v = (Vector) pendingCreates.remove(src); for (Iterator it2 = v.iterator(); it2.hasNext(); ) { Block b = (Block) it2.next(); pendingCreateBlocks.remove(b); } } /** * Renew the lease(s) held by the given client */ public void renewLease(UTF8 holder) { synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease != null) { sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } } } /** * Get a listing of all files at 'src'. The Object[] array * exists so we can return file attributes (soon to be implemented) */ public DFSFileInfo[] getListing(UTF8 src) { return dir.getListing(src); } ///////////////////////////////////////////////////////// // // These methods are called by datanodes // ///////////////////////////////////////////////////////// /** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation */ public synchronized void gotHeartbeat(UTF8 name, long capacity, long remaining) { synchronized (heartbeats) { synchronized (datanodeMap) { long capacityDiff = 0; long remainingDiff = 0; DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name); if (nodeinfo == null) { LOG.info("Got brand-new heartbeat from " + name); nodeinfo = new DatanodeInfo(name, capacity, remaining); datanodeMap.put(name, nodeinfo); capacityDiff = capacity; remainingDiff = remaining; } else { capacityDiff = capacity - nodeinfo.getCapacity(); remainingDiff = remaining - nodeinfo.getRemaining(); heartbeats.remove(nodeinfo); nodeinfo.updateHeartbeat(capacity, remaining); } heartbeats.add(nodeinfo); totalCapacity += capacityDiff; totalRemaining += remainingDiff; } } } /** * Periodically calls heartbeatCheck(). */ class HeartbeatMonitor implements Runnable { /** */ public void run() { while (fsRunning) { heartbeatCheck(); try { Thread.sleep(heartBeatRecheck); } catch (InterruptedException ie) { } } } } /** * Check if there are any expired heartbeats, and if so, * whether any blocks have to be re-replicated. */ synchronized void heartbeatCheck() { synchronized (heartbeats) { DatanodeInfo nodeInfo = null; while ((heartbeats.size() > 0) && ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) && (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) { LOG.info("Lost heartbeat for " + nodeInfo.getName()); heartbeats.remove(nodeInfo); synchronized (datanodeMap) { datanodeMap.remove(nodeInfo.getName()); } totalCapacity -= nodeInfo.getCapacity(); totalRemaining -= nodeInfo.getRemaining(); Block deadblocks[] = nodeInfo.getBlocks(); if (deadblocks != null) { for (int i = 0; i < deadblocks.length; i++) { removeStoredBlock(deadblocks[i], nodeInfo); } } if (heartbeats.size() > 0) { nodeInfo = (DatanodeInfo) heartbeats.first(); } } } } /** * The given node is reporting all its blocks. Use this info to * update the (machine-->blocklist) and (block-->machinelist) tables. */ public synchronized Block[] processReport(Block newReport[], UTF8 name) { DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name); if (node == null) { throw new IllegalArgumentException("Unexpected exception. Received block report from node " + name + ", but there is no info for " + name); } // // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // int oldPos = 0, newPos = 0; Block oldReport[] = node.getBlocks(); while (oldReport != null && newReport != null && oldPos < oldReport.length && newPos < newReport.length) { int cmp = oldReport[oldPos].compareTo(newReport[newPos]); if (cmp == 0) { // Do nothing, blocks are the same oldPos++; newPos++; } else if (cmp < 0) { // The old report has a block the new one does not removeStoredBlock(oldReport[oldPos], node); oldPos++; } else { // The new report has a block the old one does not addStoredBlock(newReport[newPos], node); newPos++; } } while (oldReport != null && oldPos < oldReport.length) { // The old report has a block the new one does not removeStoredBlock(oldReport[oldPos], node); oldPos++; } while (newReport != null && newPos < newReport.length) { // The new report has a block the old one does not addStoredBlock(newReport[newPos], node); newPos++; } // // Modify node so it has the new blockreport // node.updateBlocks(newReport); // // We've now completely updated the node's block report profile. // We now go through all its blocks and find which ones are invalid, // no longer pending, or over-replicated. // // (Note it's not enough to just invalidate blocks at lease expiry // time; datanodes can go down before the client's lease on // the failed file expires and miss the "expire" event.) // // This function considers every block on a datanode, and thus // should only be invoked infrequently. // Vector obsolete = new Vector(); for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { Block b = (Block) it.next(); if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) { LOG.info("Obsoleting block " + b); obsolete.add(b); } } return (Block[]) obsolete.toArray(new Block[obsolete.size()]); } /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. */ synchronized void addStoredBlock(Block block, DatanodeInfo node) { TreeSet containingNodes = (TreeSet) blocksMap.get(block); if (containingNodes == null) { containingNodes = new TreeSet(); blocksMap.put(block, containingNodes); } if (! containingNodes.contains(node)) { containingNodes.add(node); } else { LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node); } synchronized (neededReplications) { if (dir.isValidBlock(block)) { if (containingNodes.size() >= this.desiredReplication) { neededReplications.remove(block); pendingReplications.remove(block); } else if (containingNodes.size() < this.desiredReplication) { if (! neededReplications.contains(block)) { neededReplications.add(block); } } // // Find how many of the containing nodes are "extra", if any. // If there are any extras, call chooseExcessReplicates() to // mark them in the excessReplicateMap. // Vector nonExcess = new Vector(); for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { DatanodeInfo cur = (DatanodeInfo) it.next(); TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName()); if (excessBlocks == null || ! excessBlocks.contains(block)) { nonExcess.add(cur); } } if (nonExcess.size() > this.maxReplication) { chooseExcessReplicates(nonExcess, block, this.maxReplication); } } } } /** * We want a max of "maxReps" replicates for any block, but we now have too many. * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that: * * srcNodes.size() - dstNodes.size() == maxReps * * For now, we choose nodes randomly. In the future, we might enforce some * kind of policy (like making sure replicates are spread across racks). */ void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) { while (nonExcess.size() - maxReps > 0) { int chosenNode = r.nextInt(nonExcess.size()); DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode); nonExcess.removeElementAt(chosenNode); TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName()); if (excessBlocks == null) { excessBlocks = new TreeSet(); excessReplicateMap.put(cur.getName(), excessBlocks); } excessBlocks.add(b); //
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?