📄 fsnamesystem.java
字号:
b = new Block(FSNamesystem.randBlockId.nextLong(), 0); } while (dir.isValidBlock(b)); FileUnderConstruction v = (FileUnderConstruction) pendingCreates.get(src); v.getBlocks().add(b); pendingCreateBlocks.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: " +src+ ". "+b.getBlockName()+ " is created and added to pendingCreates and pendingCreateBlocks" ); return b; } /** * Check that the indicated file's blocks are present and * replicated. If not, return false. */ synchronized boolean checkFileProgress(UTF8 src) { FileUnderConstruction v = (FileUnderConstruction) pendingCreates.get(src); for (Iterator it = v.getBlocks().iterator(); it.hasNext(); ) { Block b = (Block) it.next(); TreeSet containingNodes = (TreeSet) blocksMap.get(b); if (containingNodes == null || containingNodes.size() < this.minReplication) { return false; } } return true; } //////////////////////////////////////////////////////////////// // Here's how to handle block-copy failure during client write: // -- As usual, the client's write should result in a streaming // backup write to a k-machine sequence. // -- If one of the backup machines fails, no worries. Fail silently. // -- Before client is allowed to close and finalize file, make sure // that the blocks are backed up. Namenode may have to issue specific backup // commands to make up for earlier datanode failures. Once all copies // are made, edit namespace and return to client. //////////////////////////////////////////////////////////////// /** * Change the indicated filename. */ public boolean renameTo(UTF8 src, UTF8 dst) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst ); if( isInSafeMode() ) throw new SafeModeException( "Cannot rename " + src, safeMode ); if (!isValidName(dst.toString())) { throw new IOException("Invalid name: " + dst); } return dir.renameTo(src, dst); } /** * Remove the indicated filename from the namespace. This may * invalidate some blocks that make up the file. */ public synchronized boolean delete(UTF8 src) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src ); if( isInSafeMode() ) throw new SafeModeException( "Cannot delete " + src, safeMode ); Block deletedBlocks[] = (Block[]) dir.delete(src); if (deletedBlocks != null) { for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; TreeSet containingNodes = (TreeSet) blocksMap.get(b); if (containingNodes != null) { for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor node = (DatanodeDescriptor) it.next(); Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getStorageID()); if (invalidateSet == null) { invalidateSet = new Vector(); recentInvalidateSets.put(node.getStorageID(), invalidateSet); } invalidateSet.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: " + b.getBlockName() + " is added to invalidSet of " + node.getName() ); } } } } return (deletedBlocks != null); } /** * Return whether the given filename exists */ public boolean exists(UTF8 src) { if (dir.getFile(src) != null || dir.isDir(src)) { return true; } else { return false; } } /** * Whether the given name is a directory */ public boolean isDir(UTF8 src) { return dir.isDir(src); } /** * Whether the pathname is valid. Currently prohibits relative paths, * and names which contain a ":" or "/" */ private boolean isValidName(String src) { // Path must be absolute. if (!src.startsWith(Path.SEPARATOR)) { return false; } // Check for ".." "." ":" "/" Enumeration tokens = new StringTokenizer(src, Path.SEPARATOR); ArrayList list = Collections.list(tokens); for (int i = 0; i < list.size(); i++) { String element = (String)list.get(i); if (element.equals("..") || element.equals(".") || (element.indexOf(":") >= 0) || (element.indexOf("/") >= 0)) { return false; } } return true; } /** * Create all the necessary directories */ public boolean mkdirs( String src ) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src ); if( isInSafeMode() ) throw new SafeModeException( "Cannot create directory " + src, safeMode ); if (!isValidName(src)) { throw new IOException("Invalid directory name: " + src); } return dir.mkdirs(src); } /** * Figure out a few hosts that are likely to contain the * block(s) referred to by the given (filename, start, len) tuple. */ public UTF8[][] getDatanodeHints(UTF8 src, long start, long len) { if (start < 0 || len < 0) { return new UTF8[0][]; } int startBlock = -1; int endBlock = -1; Block blocks[] = dir.getFile(src); if (blocks == null) { // no blocks return new UTF8[0][]; } // // First, figure out where the range falls in // the blocklist. // long startpos = start; long endpos = start + len; for (int i = 0; i < blocks.length; i++) { if (startpos >= 0) { startpos -= blocks[i].getNumBytes(); if (startpos <= 0) { startBlock = i; } } if (endpos >= 0) { endpos -= blocks[i].getNumBytes(); if (endpos <= 0) { endBlock = i; break; } } } // // Next, create an array of hosts where each block can // be found // if (startBlock < 0 || endBlock < 0) { return new UTF8[0][]; } else { UTF8 hosts[][] = new UTF8[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]); Vector v = new Vector(); if (containingNodes != null) { for (Iterator it =containingNodes.iterator(); it.hasNext();) { DatanodeDescriptor cur = (DatanodeDescriptor) it.next(); v.add(new UTF8( cur.getHost() )); } } hosts[i-startBlock] = (UTF8[]) v.toArray(new UTF8[v.size()]); } return hosts; } } /************************************************************ * A Lease governs all the locks held by a single client. * For each client there's a corresponding lease, whose * timestamp is updated when the client periodically * checks in. If the client dies and allows its lease to * expire, all the corresponding locks can be released. *************************************************************/ class Lease implements Comparable { public UTF8 holder; public long lastUpdate; private TreeSet locks = new TreeSet(); private TreeSet creates = new TreeSet(); public Lease(UTF8 holder) { this.holder = holder; renew(); } public void renew() { this.lastUpdate = now(); } public boolean expired() { if (now() - lastUpdate > LEASE_PERIOD) { return true; } else { return false; } } public void obtained(UTF8 src) { locks.add(src); } public void released(UTF8 src) { locks.remove(src); } public void startedCreate(UTF8 src) { creates.add(src); } public boolean completedCreate(UTF8 src) { return creates.remove(src); } public boolean hasLocks() { return (locks.size() + creates.size()) > 0; } public void releaseLocks() { for (Iterator it = locks.iterator(); it.hasNext(); ) { UTF8 src = (UTF8) it.next(); internalReleaseLock(src, holder); } locks.clear(); for (Iterator it = creates.iterator(); it.hasNext(); ) { UTF8 src = (UTF8) it.next(); internalReleaseCreate(src, holder); } creates.clear(); } /** */ public String toString() { return "[Lease. Holder: " + holder.toString() + ", heldlocks: " + locks.size() + ", pendingcreates: " + creates.size() + "]"; } /** */ public int compareTo(Object o) { Lease l1 = (Lease) this; Lease l2 = (Lease) o; long lu1 = l1.lastUpdate; long lu2 = l2.lastUpdate; if (lu1 < lu2) { return -1; } else if (lu1 > lu2) { return 1; } else { return l1.holder.compareTo(l2.holder); } } } /****************************************************** * LeaseMonitor checks for leases that have expired, * and disposes of them. ******************************************************/ class LeaseMonitor implements Runnable { public void run() { while (fsRunning) { synchronized (FSNamesystem.this) { synchronized (leases) { Lease top; while ((sortedLeases.size() > 0) && ((top = (Lease) sortedLeases.first()) != null)) { if (top.expired()) { top.releaseLocks(); leases.remove(top.holder); LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size()); if (!sortedLeases.remove(top)) { LOG.info("Unknown failure trying to remove " + top + " from lease set."); } } else { break; } } } } try { Thread.sleep(2000); } catch (InterruptedException ie) { } } } } /** * Get a lock (perhaps exclusive) on the given file */ public synchronized int obtainLock( UTF8 src, UTF8 holder, boolean exclusive) throws IOException { if( isInSafeMode() ) throw new SafeModeException( "Cannot lock file " + src, safeMode ); int result = dir.obtainLock(src, holder, exclusive); if (result == COMPLETE_SUCCESS) { synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease == null) { lease = new Lease(holder); leases.put(holder, lease); sortedLeases.add(lease); } else { sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } lease.obtained(src); } } return result; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -