fsnamesystem.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 1,340 行 · 第 1/4 页
JAVA
1,340 行
} /** * Abandon the entire file in progress */ public synchronized void abandonFileInProgress(UTF8 src) throws IOException { internalReleaseCreate(src); } /** * Finalize the created file and make it world-accessible. The * FSNamesystem will already know the blocks that make up the file. * Before we return, we make sure that all the file's blocks have * been reported by datanodes and are replicated correctly. */ public synchronized int completeFile(UTF8 src, UTF8 holder) { if (dir.getFile(src) != null || pendingCreates.get(src) == null) { LOG.info("Failed to complete " + src + " because dir.getFile()==" + dir.getFile(src) + " and " + pendingCreates.get(src)); return OPERATION_FAILED; } else if (! checkFileProgress(src)) { return STILL_WAITING; } else { Vector pendingVector = (Vector) pendingCreates.get(src); Block pendingBlocks[] = (Block[]) pendingVector.toArray(new Block[pendingVector.size()]); // // We have the pending blocks, but they won't have // length info in them (as they were allocated before // data-write took place). So we need to add the correct // length info to each // // REMIND - mjc - this is very inefficient! We should // improve this! // for (int i = 0; i < pendingBlocks.length; i++) { Block b = pendingBlocks[i]; TreeSet containingNodes = (TreeSet) blocksMap.get(b); DatanodeInfo node = (DatanodeInfo) containingNodes.first(); for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { Block cur = (Block) it.next(); if (b.getBlockId() == cur.getBlockId()) { b.setNumBytes(cur.getNumBytes()); break; } } } // // Now we can add the (name,blocks) tuple to the filesystem // if (dir.addFile(src, pendingBlocks)) { // The file is no longer pending pendingCreates.remove(src); for (int i = 0; i < pendingBlocks.length; i++) { pendingCreateBlocks.remove(pendingBlocks[i]); } synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease != null) { lease.completedCreate(src); if (! lease.hasLocks()) { leases.remove(holder); sortedLeases.remove(lease); } } } // // REMIND - mjc - this should be done only after we wait a few secs. // The namenode isn't giving datanodes enough time to report the // replicated blocks that are automatically done as part of a client // write. // // Now that the file is real, we need to be sure to replicate // the blocks. for (int i = 0; i < pendingBlocks.length; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]); if (containingNodes.size() < this.desiredReplication) { synchronized (neededReplications) { LOG.info("Completed file " + src + ", at holder " + holder + ". There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + this.desiredReplication); neededReplications.add(pendingBlocks[i]); } } } return COMPLETE_SUCCESS; } else { System.out.println("AddFile() for " + src + " failed"); } LOG.info("Dropped through on file add...."); } return OPERATION_FAILED; } /** * Allocate a block at the given pending filename */ synchronized Block allocateBlock(UTF8 src) { Block b = new Block(); Vector v = (Vector) pendingCreates.get(src); v.add(b); pendingCreateBlocks.add(b); return b; } /** * Check that the indicated file's blocks are present and * replicated. If not, return false. */ synchronized boolean checkFileProgress(UTF8 src) { Vector v = (Vector) pendingCreates.get(src); for (Iterator it = v.iterator(); it.hasNext(); ) { Block b = (Block) it.next(); TreeSet containingNodes = (TreeSet) blocksMap.get(b); if (containingNodes == null || containingNodes.size() < this.minReplication) { return false; } } return true; } //////////////////////////////////////////////////////////////// // Here's how to handle block-copy failure during client write: // -- As usual, the client's write should result in a streaming // backup write to a k-machine sequence. // -- If one of the backup machines fails, no worries. Fail silently. // -- Before client is allowed to close and finalize file, make sure // that the blocks are backed up. Namenode may have to issue specific backup // commands to make up for earlier datanode failures. Once all copies // are made, edit namespace and return to client. //////////////////////////////////////////////////////////////// /** * Change the indicated filename. */ public boolean renameTo(UTF8 src, UTF8 dst) { return dir.renameTo(src, dst); } /** * Remove the indicated filename from the namespace. This may * invalidate some blocks that make up the file. */ public synchronized boolean delete(UTF8 src) { Block deletedBlocks[] = (Block[]) dir.delete(src); if (deletedBlocks != null) { for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; TreeSet containingNodes = (TreeSet) blocksMap.get(b); if (containingNodes != null) { for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getName()); if (invalidateSet == null) { invalidateSet = new Vector(); recentInvalidateSets.put(node.getName(), invalidateSet); } invalidateSet.add(b); } } } } return (deletedBlocks != null); } /** * Return whether the given filename exists */ public boolean exists(UTF8 src) { if (dir.getFile(src) != null || dir.isDir(src)) { return true; } else { return false; } } /** * Whether the given name is a directory */ public boolean isDir(UTF8 src) { return dir.isDir(src); } /** * Create all the necessary directories */ public boolean mkdirs(UTF8 src) { return dir.mkdirs(src); } /** * Figure out a few hosts that are likely to contain the * block(s) referred to by the given (filename, start, len) tuple. */ public UTF8[][] getDatanodeHints(UTF8 src, long start, long len) { if (start < 0 || len < 0) { return new UTF8[0][]; } int startBlock = -1; int endBlock = -1; Block blocks[] = dir.getFile(src); if (blocks == null) { // no blocks return new UTF8[0][]; } // // First, figure out where the range falls in // the blocklist. // long startpos = start; long endpos = start + len; for (int i = 0; i < blocks.length; i++) { if (startpos >= 0) { startpos -= blocks[i].getNumBytes(); if (startpos <= 0) { startBlock = i; } } if (endpos >= 0) { endpos -= blocks[i].getNumBytes(); if (endpos <= 0) { endBlock = i; break; } } } // // Next, create an array of hosts where each block can // be found // if (startBlock < 0 || endBlock < 0) { return new UTF8[0][]; } else { UTF8 hosts[][] = new UTF8[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]); Vector v = new Vector(); for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { DatanodeInfo cur = (DatanodeInfo) it.next(); v.add(cur.getHost()); } hosts[i-startBlock] = (UTF8[]) v.toArray(new UTF8[v.size()]); } return hosts; } } /************************************************************ * A Lease governs all the locks held by a single client. * For each client there's a corresponding lease, whose * timestamp is updated when the client periodically * checks in. If the client dies and allows its lease to * expire, all the corresponding locks can be released. *************************************************************/ class Lease implements Comparable { public UTF8 holder; public long lastUpdate; TreeSet locks = new TreeSet(); TreeSet creates = new TreeSet(); public Lease(UTF8 holder) { this.holder = holder; renew(); } public void renew() { this.lastUpdate = System.currentTimeMillis(); } public boolean expired() { if (System.currentTimeMillis() - lastUpdate > LEASE_PERIOD) { return true; } else { return false; } } public void obtained(UTF8 src) { locks.add(src); } public void released(UTF8 src) { locks.remove(src); } public void startedCreate(UTF8 src) { creates.add(src); } public void completedCreate(UTF8 src) { creates.remove(src); } public boolean hasLocks() { return (locks.size() + creates.size()) > 0; } public void releaseLocks() { for (Iterator it = locks.iterator(); it.hasNext(); ) { UTF8 src = (UTF8) it.next(); internalReleaseLock(src, holder); } locks.clear(); for (Iterator it = creates.iterator(); it.hasNext(); ) { UTF8 src = (UTF8) it.next(); internalReleaseCreate(src); } creates.clear(); } /** */ public String toString() { return "[Lease. Holder: " + holder.toString() + ", heldlocks: " + locks.size() + ", pendingcreates: " + creates.size() + "]"; } /** */ public int compareTo(Object o) { Lease l1 = (Lease) this; Lease l2 = (Lease) o; long lu1 = l1.lastUpdate; long lu2 = l2.lastUpdate; if (lu1 < lu2) { return -1; } else if (lu1 > lu2) { return 1; } else { return l1.holder.compareTo(l2.holder); } } } /****************************************************** * LeaseMonitor checks for leases that have expired, * and disposes of them.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?