📄 fsnamesystem.java
字号:
// old replication > the new one; need to remove copies LOG.info("Reducing replication for file " + src + ". New replication is " + replication ); for( int idx = 0; idx < fileBlocks.length; idx++ ) proccessOverReplicatedBlock( fileBlocks[idx], replication ); } } return true; } public long getBlockSize(String filename) throws IOException { return dir.getBlockSize(filename); } /** * Check whether the replication parameter is within the range * determined by system configuration. */ private void verifyReplication( String src, short replication, UTF8 clientName ) throws IOException { String text = "file " + src + ((clientName != null) ? " on client " + clientName : "") + ".\n" + "Requested replication " + replication; if( replication > maxReplication ) throw new IOException( text + " exceeds maximum " + maxReplication ); if( replication < minReplication ) throw new IOException( text + " is less than the required minimum " + minReplication ); } /** * The client would like to create a new block for the indicated * filename. Return an array that consists of the block, plus a set * of machines. The first on this list should be where the client * writes data. Subsequent items in the list must be provided in * the connection to the first datanode. * @return Return an array that consists of the block, plus a set * of machines * @throws IOException if the filename is invalid * {@link FSDirectory#isValidToCreate(UTF8)}. */ public synchronized Object[] startFile( UTF8 src, UTF8 holder, UTF8 clientMachine, boolean overwrite, short replication, long blockSize ) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file " +src+" for "+holder+" at "+clientMachine); if( isInSafeMode() ) throw new SafeModeException( "Cannot create file" + src, safeMode ); if (!isValidName(src.toString())) { throw new IOException("Invalid file name: " + src); } try { if (pendingCreates.get(src) != null) { throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " on client " + clientMachine + " because pendingCreates is non-null."); } try { verifyReplication(src.toString(), replication, clientMachine ); } catch( IOException e) { throw new IOException( "failed to create "+e.getMessage()); } if (!dir.isValidToCreate(src)) { if (overwrite) { delete(src); } else { throw new IOException("failed to create file " + src +" on client " + clientMachine +" either because the filename is invalid or the file exists"); } } // Get the array of replication targets DatanodeDescriptor targets[] = chooseTargets(replication, null, clientMachine, blockSize); if (targets.length < this.minReplication) { throw new IOException("failed to create file "+src +" on client " + clientMachine +" because target-length is " + targets.length +", below MIN_REPLICATION (" + minReplication+ ")"); } // Reserve space for this pending file pendingCreates.put(src, new FileUnderConstruction(replication, blockSize, holder, clientMachine)); NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: " +"add "+src+" to pendingCreates for "+holder ); synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease == null) { lease = new Lease(holder); leases.put(holder, lease); sortedLeases.add(lease); } else { sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } lease.startedCreate(src); } // Create next block Object results[] = new Object[2]; results[0] = allocateBlock(src); results[1] = targets; return results; } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " +ie.getMessage()); throw ie; } } /** * The client would like to obtain an additional block for the indicated * filename (which is being written-to). Return an array that consists * of the block, plus a set of machines. The first on this list should * be where the client writes data. Subsequent items in the list must * be provided in the connection to the first datanode. * * Make sure the previous blocks have been reported by datanodes and * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientName ) throws IOException { NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file " +src+" for "+clientName); if( isInSafeMode() ) throw new SafeModeException( "Cannot add block to " + src, safeMode ); FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src); // make sure that we still have the lease on this file if (pendingFile == null) { throw new LeaseExpiredException("No lease on " + src); } if (!pendingFile.getClientName().equals(clientName)) { throw new LeaseExpiredException("Lease mismatch on " + src + " owned by " + pendingFile.getClientName() + " and appended by " + clientName); } if (dir.getFile(src) != null) { throw new IOException("File " + src + " created during write"); } // // If we fail this, bad things happen! // if (!checkFileProgress(src)) { throw new NotReplicatedYetException("Not replicated yet"); } // Get the array of replication targets DatanodeDescriptor targets[] = chooseTargets(pendingFile.getReplication(), null, pendingFile.getClientMachine(), pendingFile.getBlockSize()); if (targets.length < this.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + minReplication); } // Create next block return new Object[]{allocateBlock(src), targets}; } /** * The client would like to let go of the given block */ public synchronized boolean abandonBlock(Block b, UTF8 src) { // // Remove the block from the pending creates list // NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " +b.getBlockName()+"of file "+src ); FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src); if (pendingFile != null) { Vector pendingVector = pendingFile.getBlocks(); for (Iterator it = pendingVector.iterator(); it.hasNext(); ) { Block cur = (Block) it.next(); if (cur.compareTo(b) == 0) { pendingCreateBlocks.remove(cur); it.remove(); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.abandonBlock: " +b.getBlockName() +" is removed from pendingCreateBlock and pendingCreates"); return true; } } } return false; } /** * Abandon the entire file in progress */ public synchronized void abandonFileInProgress(UTF8 src, UTF8 holder ) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src ); synchronized (leases) { // find the lease Lease lease = (Lease) leases.get(holder); if (lease != null) { // remove the file from the lease if (lease.completedCreate(src)) { // if we found the file in the lease, remove it from pendingCreates internalReleaseCreate(src, holder); } else { LOG.info("Attempt by " + holder.toString() + " to release someone else's create lock on " + src.toString()); } } else { LOG.info("Attempt to release a lock from an unknown lease holder " + holder.toString() + " for " + src.toString()); } } } /** * Finalize the created file and make it world-accessible. The * FSNamesystem will already know the blocks that make up the file. * Before we return, we make sure that all the file's blocks have * been reported by datanodes and are replicated correctly. */ public synchronized int completeFile( UTF8 src, UTF8 holder) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder ); if( isInSafeMode() ) throw new SafeModeException( "Cannot complete file " + src, safeMode ); if (dir.getFile(src) != null || pendingCreates.get(src) == null) { NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: " + "failed to complete " + src + " because dir.getFile()==" + dir.getFile(src) + " and " + pendingCreates.get(src)); return OPERATION_FAILED; } else if (! checkFileProgress(src)) { return STILL_WAITING; } FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src); Vector blocks = pendingFile.getBlocks(); int nrBlocks = blocks.size(); Block pendingBlocks[] = (Block[]) blocks.toArray(new Block[nrBlocks]); // // We have the pending blocks, but they won't have // length info in them (as they were allocated before // data-write took place). So we need to add the correct // length info to each // // REMIND - mjc - this is very inefficient! We should // improve this! // for (int i = 0; i < nrBlocks; i++) { Block b = (Block)pendingBlocks[i]; TreeSet containingNodes = (TreeSet) blocksMap.get(b); DatanodeDescriptor node = (DatanodeDescriptor) containingNodes.first(); for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { Block cur = (Block) it.next(); if (b.getBlockId() == cur.getBlockId()) { b.setNumBytes(cur.getNumBytes()); break; } } } // // Now we can add the (name,blocks) tuple to the filesystem // if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) { return OPERATION_FAILED; } // The file is no longer pending pendingCreates.remove(src); NameNode.stateChangeLog.debug( "DIR* NameSystem.completeFile: " + src + " is removed from pendingCreates"); for (int i = 0; i < nrBlocks; i++) { pendingCreateBlocks.remove(pendingBlocks[i]); } synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease != null) { lease.completedCreate(src); if (! lease.hasLocks()) { leases.remove(holder); sortedLeases.remove(lease); } } } // // REMIND - mjc - this should be done only after we wait a few secs. // The namenode isn't giving datanodes enough time to report the // replicated blocks that are automatically done as part of a client // write. // // Now that the file is real, we need to be sure to replicate // the blocks. for (int i = 0; i < nrBlocks; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]); if (containingNodes.size() < pendingFile.getReplication()) { NameNode.stateChangeLog.debug( "DIR* NameSystem.completeFile:" + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size() +" replicas so is added to neededReplications"); synchronized (neededReplications) { neededReplications.add(pendingBlocks[i]); } } } return COMPLETE_SUCCESS; } static Random randBlockId = new Random(); /** * Allocate a block at the given pending filename */ synchronized Block allocateBlock(UTF8 src) { Block b = null; do {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -