📄 fsnamesystem.java
字号:
/** * Release the lock on the given file */ public synchronized int releaseLock(UTF8 src, UTF8 holder) { int result = internalReleaseLock(src, holder); if (result == COMPLETE_SUCCESS) { synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease != null) { lease.released(src); if (! lease.hasLocks()) { leases.remove(holder); sortedLeases.remove(lease); } } } } return result; } private int internalReleaseLock(UTF8 src, UTF8 holder) { return dir.releaseLock(src, holder); } /** * Release a pending file creation lock. * @param src The filename * @param holder The datanode that was creating the file */ private void internalReleaseCreate(UTF8 src, UTF8 holder) { FileUnderConstruction v = (FileUnderConstruction) pendingCreates.remove(src); if (v != null) { NameNode.stateChangeLog.debug( "DIR* NameSystem.internalReleaseCreate: " + src + " is removed from pendingCreates for " + holder + " (failure)"); for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) { Block b = (Block) it2.next(); pendingCreateBlocks.remove(b); } } else { NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: " + "attempt to release a create lock on "+ src.toString() + " that was not in pedingCreates"); } } /** * Renew the lease(s) held by the given client */ public void renewLease(UTF8 holder) throws IOException { synchronized (leases) { if( isInSafeMode() ) throw new SafeModeException( "Cannot renew lease for " + holder, safeMode ); Lease lease = (Lease) leases.get(holder); if (lease != null) { sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } } } /** * Get a listing of all files at 'src'. The Object[] array * exists so we can return file attributes (soon to be implemented) */ public DFSFileInfo[] getListing(UTF8 src) { return dir.getListing(src); } ///////////////////////////////////////////////////////// // // These methods are called by datanodes // ///////////////////////////////////////////////////////// /** * Register Datanode. * <p> * The purpose of registration is to identify whether the new datanode * serves a new data storage, and will report new data block copies, * which the namenode was not aware of; or the datanode is a replacement * node for the data storage that was previously served by a different * or the same (in terms of host:port) datanode. * The data storages are distinguished by their storageIDs. When a new * data storage is reported the namenode issues a new unique storageID. * <p> * Finally, the namenode returns its namespaceID as the registrationID * for the datanodes. * namespaceID is a persistent attribute of the name space. * The registrationID is checked every time the datanode is communicating * with the namenode. * Datanodes with inappropriate registrationID are rejected. * If the namenode stops, and then restarts it can restore its * namespaceID and will continue serving the datanodes that has previously * registered with the namenode without restarting the whole cluster. * * @see DataNode#register() * @author Konstantin Shvachko */ public synchronized void registerDatanode( DatanodeRegistration nodeReg ) throws IOException { NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " + "node registration from " + nodeReg.getName() + " storage " + nodeReg.getStorageID() ); nodeReg.registrationID = getRegistrationID(); DatanodeDescriptor nodeS = (DatanodeDescriptor)datanodeMap.get(nodeReg.getStorageID()); DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() ); if( nodeN != null && nodeS != null && nodeN == nodeS ) { // The same datanode has been just restarted to serve the same data // storage. We do not need to remove old data blocks, the delta will // be calculated on the next block report from the datanode NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " + "node restarted." ); return; } if( nodeN != null ) { // nodeN previously served a different data storage, // which is not served by anybody anymore. removeDatanode( nodeN ); // physically remove node from datanodeMap wipeDatanode( nodeN ); // and log removal getEditLog().logRemoveDatanode( nodeN ); nodeN = null; } // nodeN is not found if( nodeS != null ) { // nodeS is found // The registering datanode is a replacement node for the existing // data storage, which from now on will be served by a new node. NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " + "node " + nodeS.name + " is replaced by " + nodeReg.getName() + "." ); getEditLog().logRemoveDatanode( nodeS ); nodeS.name = nodeReg.getName(); getEditLog().logAddDatanode( nodeS ); return; } // this is a new datanode serving a new data storage if( nodeReg.getStorageID().equals("") ) { // this data storage has never been registered // it is either empty or was created by pre-storageID version of DFS nodeReg.storageID = newStorageID(); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " + "new storageID " + nodeReg.getStorageID() + " assigned." ); } // register new datanode DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg ); unprotectedAddDatanode( nodeDescr ); getEditLog().logAddDatanode( nodeDescr ); return; } /** * Get registrationID for datanodes based on the namespaceID. * * @see #registerDatanode(DatanodeRegistration) * @see FSImage#newNamespaceID() * @return registration ID */ public String getRegistrationID() { return "NS" + Integer.toString( dir.namespaceID ); } /** * Generate new storage ID. * * @return unique storage ID * * Note: that collisions are still possible if somebody will try * to bring in a data storage from a different cluster. */ private String newStorageID() { String newID = null; while( newID == null ) { newID = "DS" + Integer.toString( r.nextInt() ); if( datanodeMap.get( newID ) != null ) newID = null; } return newID; } /** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation */ public synchronized void gotHeartbeat(DatanodeID nodeID, long capacity, long remaining, int xceiverCount) throws IOException { synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = getDatanode( nodeID ); if (nodeinfo == null) // We do not accept unregistered guests throw new UnregisteredDatanodeException( nodeID ); removeHeartbeat(nodeinfo); nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount); addHeartbeat(nodeinfo); } } } /** * Periodically calls heartbeatCheck(). */ class HeartbeatMonitor implements Runnable { /** */ public void run() { while (fsRunning) { heartbeatCheck(); try { Thread.sleep(heartBeatRecheck); } catch (InterruptedException ie) { } } } } /** * remove a datanode descriptor * @param nodeID datanode ID * @author hairong */ synchronized public void removeDatanode( DatanodeID nodeID ) throws IOException { DatanodeDescriptor nodeInfo = getDatanode( nodeID ); if (nodeInfo != null) { removeDatanode( nodeInfo ); } else { NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: " + nodeInfo.getName() + " does not exist"); } } /** * remove a datanode descriptor * @param nodeInfo datanode descriptor * @author hairong */ private void removeDatanode( DatanodeDescriptor nodeInfo ) { removeHeartbeat(nodeInfo); Block deadblocks[] = nodeInfo.getBlocks(); if( deadblocks != null ) for( int i = 0; i < deadblocks.length; i++ ) removeStoredBlock(deadblocks[i], nodeInfo); unprotectedRemoveDatanode(nodeInfo); } void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) { // datanodeMap.remove(nodeDescr.getStorageID()); // deaddatanodeMap.put(nodeDescr.getName(), nodeDescr); nodeDescr.resetBlocks(); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.unprotectedRemoveDatanode: " + nodeDescr.getName() + " is out of service now."); } void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) { datanodeMap.put( nodeDescr.getStorageID(), nodeDescr ); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.unprotectedAddDatanode: " + "node " + nodeDescr.getName() + " is added to datanodeMap." ); } private void addHeartbeat( DatanodeDescriptor nodeDescr ) { heartbeats.add(nodeDescr); totalCapacity += nodeDescr.capacity; totalRemaining += nodeDescr.remaining; } private void removeHeartbeat( DatanodeDescriptor nodeDescr ) { totalCapacity -= nodeDescr.getCapacity(); totalRemaining -= nodeDescr.getRemaining(); heartbeats.remove(nodeDescr); } /** * Physically remove node from datanodeMap. * * @param nodeID node */ void wipeDatanode( DatanodeID nodeID ) { datanodeMap.remove(nodeID.getStorageID()); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.wipeDatanode: " + nodeID.getName() + " storage " + nodeID.getStorageID() + " is removed from datanodeMap."); } private FSEditLog getEditLog() { return dir.fsImage.getEditLog(); } /** * Check if there are any expired heartbeats, and if so, * whether any blocks have to be re-replicated. */ synchronized void heartbeatCheck() { synchronized (heartbeats) { DatanodeDescriptor nodeInfo = null; while ((heartbeats.size() > 0) && ((nodeInfo = (DatanodeDescriptor) heartbeats.first()) != null) && (nodeInfo.isDead())) { NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: " + "lost heartbeat from " + nodeInfo.getName()); removeDatanode( nodeInfo ); } } } /** * The given node is reporting all its blocks. Use this info to * update the (machine-->blocklist) and (block-->machinelist) tables. */ public synchronized Block[] processReport(DatanodeID nodeID, Block newReport[] ) throws IOException { NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " +"from "+nodeID.getName()+" "+newReport.length+" blocks" ); DatanodeDescriptor node = getDatanode( nodeID ); // // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // int oldPos = 0, newPos = 0; Block oldReport[] = node.getBlocks();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -