⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fsnamesystem.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
          // old replication > the new one; need to remove copies          LOG.info("Reducing replication for file " + src                     + ". New replication is " + replication );          for( int idx = 0; idx < fileBlocks.length; idx++ )            proccessOverReplicatedBlock( fileBlocks[idx], replication );        }      }      return true;    }        public long getBlockSize(String filename) throws IOException {      return dir.getBlockSize(filename);    }        /**     * Check whether the replication parameter is within the range     * determined by system configuration.     */    private void verifyReplication( String src,                                     short replication,                                     UTF8 clientName                                   ) throws IOException {      String text = "file " + src               + ((clientName != null) ? " on client " + clientName : "")              + ".\n"              + "Requested replication " + replication;      if( replication > maxReplication )        throw new IOException( text + " exceeds maximum " + maxReplication );            if( replication < minReplication )        throw new IOException(              text + " is less than the required minimum " + minReplication );    }        /**     * The client would like to create a new block for the indicated     * filename.  Return an array that consists of the block, plus a set      * of machines.  The first on this list should be where the client      * writes data.  Subsequent items in the list must be provided in     * the connection to the first datanode.     * @return Return an array that consists of the block, plus a set     * of machines     * @throws IOException if the filename is invalid     *         {@link FSDirectory#isValidToCreate(UTF8)}.     */    public synchronized Object[] startFile( UTF8 src,                                             UTF8 holder,                                             UTF8 clientMachine,                                             boolean overwrite,                                            short replication,                                            long blockSize                                          ) throws IOException {      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "            +src+" for "+holder+" at "+clientMachine);      if( isInSafeMode() )        throw new SafeModeException( "Cannot create file" + src, safeMode );      if (!isValidName(src.toString())) {        throw new IOException("Invalid file name: " + src);      	        }      try {        if (pendingCreates.get(src) != null) {           throw new AlreadyBeingCreatedException(                   "failed to create file " + src + " for " + holder +                   " on client " + clientMachine +                    " because pendingCreates is non-null.");        }        try {           verifyReplication(src.toString(), replication, clientMachine );        } catch( IOException e) {            throw new IOException( "failed to create "+e.getMessage());        }        if (!dir.isValidToCreate(src)) {          if (overwrite) {            delete(src);          } else {            throw new IOException("failed to create file " + src                     +" on client " + clientMachine                    +" either because the filename is invalid or the file exists");          }        }        // Get the array of replication targets         DatanodeDescriptor targets[] = chooseTargets(replication, null,                                                clientMachine, blockSize);        if (targets.length < this.minReplication) {            throw new IOException("failed to create file "+src                    +" on client " + clientMachine                    +" because target-length is " + targets.length                     +", below MIN_REPLICATION (" + minReplication+ ")");       }        // Reserve space for this pending file        pendingCreates.put(src,                            new FileUnderConstruction(replication,                                                      blockSize,                                                     holder,                                                     clientMachine));        NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "                   +"add "+src+" to pendingCreates for "+holder );        synchronized (leases) {            Lease lease = (Lease) leases.get(holder);            if (lease == null) {                lease = new Lease(holder);                leases.put(holder, lease);                sortedLeases.add(lease);            } else {                sortedLeases.remove(lease);                lease.renew();                sortedLeases.add(lease);            }            lease.startedCreate(src);        }        // Create next block        Object results[] = new Object[2];        results[0] = allocateBlock(src);        results[1] = targets;        return results;      } catch (IOException ie) {          NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "                  +ie.getMessage());        throw ie;      }    }    /**     * The client would like to obtain an additional block for the indicated     * filename (which is being written-to).  Return an array that consists     * of the block, plus a set of machines.  The first on this list should     * be where the client writes data.  Subsequent items in the list must     * be provided in the connection to the first datanode.     *     * Make sure the previous blocks have been reported by datanodes and     * are replicated.  Will return an empty 2-elt array if we want the     * client to "try again later".     */    public synchronized Object[] getAdditionalBlock(UTF8 src,                                                     UTF8 clientName                                                    ) throws IOException {        NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "            +src+" for "+clientName);        if( isInSafeMode() )          throw new SafeModeException( "Cannot add block to " + src, safeMode );        FileUnderConstruction pendingFile =           (FileUnderConstruction) pendingCreates.get(src);        // make sure that we still have the lease on this file        if (pendingFile == null) {          throw new LeaseExpiredException("No lease on " + src);        }        if (!pendingFile.getClientName().equals(clientName)) {          throw new LeaseExpiredException("Lease mismatch on " + src +               " owned by " + pendingFile.getClientName() +               " and appended by " + clientName);        }        if (dir.getFile(src) != null) {          throw new IOException("File " + src + " created during write");        }        //        // If we fail this, bad things happen!        //        if (!checkFileProgress(src)) {          throw new NotReplicatedYetException("Not replicated yet");        }                // Get the array of replication targets         DatanodeDescriptor targets[] = chooseTargets(pendingFile.getReplication(),             null, pendingFile.getClientMachine(), pendingFile.getBlockSize());        if (targets.length < this.minReplication) {          throw new IOException("File " + src + " could only be replicated to " +                                targets.length + " nodes, instead of " +                                minReplication);        }                // Create next block        return new Object[]{allocateBlock(src), targets};    }    /**     * The client would like to let go of the given block     */    public synchronized boolean abandonBlock(Block b, UTF8 src) {        //        // Remove the block from the pending creates list        //        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "                +b.getBlockName()+"of file "+src );        FileUnderConstruction pendingFile =           (FileUnderConstruction) pendingCreates.get(src);        if (pendingFile != null) {            Vector pendingVector = pendingFile.getBlocks();            for (Iterator it = pendingVector.iterator(); it.hasNext(); ) {                Block cur = (Block) it.next();                if (cur.compareTo(b) == 0) {                    pendingCreateBlocks.remove(cur);                    it.remove();                    NameNode.stateChangeLog.debug(                             "BLOCK* NameSystem.abandonBlock: "                            +b.getBlockName()                            +" is removed from pendingCreateBlock and pendingCreates");                    return true;                }            }        }        return false;    }    /**     * Abandon the entire file in progress     */    public synchronized void abandonFileInProgress(UTF8 src,                                                    UTF8 holder                                                   ) throws IOException {      NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );      synchronized (leases) {        // find the lease        Lease lease = (Lease) leases.get(holder);        if (lease != null) {          // remove the file from the lease          if (lease.completedCreate(src)) {            // if we found the file in the lease, remove it from pendingCreates            internalReleaseCreate(src, holder);          } else {            LOG.info("Attempt by " + holder.toString() +                 " to release someone else's create lock on " +                 src.toString());          }        } else {          LOG.info("Attempt to release a lock from an unknown lease holder "              + holder.toString() + " for " + src.toString());        }      }    }    /**     * Finalize the created file and make it world-accessible.  The     * FSNamesystem will already know the blocks that make up the file.     * Before we return, we make sure that all the file's blocks have      * been reported by datanodes and are replicated correctly.     */    public synchronized int completeFile( UTF8 src,                                           UTF8 holder) throws IOException {        NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );        if( isInSafeMode() )          throw new SafeModeException( "Cannot complete file " + src, safeMode );        if (dir.getFile(src) != null || pendingCreates.get(src) == null) {            NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "                    + "failed to complete " + src                    + " because dir.getFile()==" + dir.getFile(src)                     + " and " + pendingCreates.get(src));            return OPERATION_FAILED;        } else if (! checkFileProgress(src)) {            return STILL_WAITING;        }                FileUnderConstruction pendingFile =             (FileUnderConstruction) pendingCreates.get(src);        Vector blocks = pendingFile.getBlocks();        int nrBlocks = blocks.size();        Block pendingBlocks[] = (Block[]) blocks.toArray(new Block[nrBlocks]);        //        // We have the pending blocks, but they won't have        // length info in them (as they were allocated before        // data-write took place).  So we need to add the correct        // length info to each        //        // REMIND - mjc - this is very inefficient!  We should        // improve this!        //        for (int i = 0; i < nrBlocks; i++) {            Block b = (Block)pendingBlocks[i];            TreeSet containingNodes = (TreeSet) blocksMap.get(b);            DatanodeDescriptor node = (DatanodeDescriptor) containingNodes.first();            for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {                Block cur = (Block) it.next();                if (b.getBlockId() == cur.getBlockId()) {                    b.setNumBytes(cur.getNumBytes());                    break;                }            }        }                //        // Now we can add the (name,blocks) tuple to the filesystem        //        if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {          return OPERATION_FAILED;        }        // The file is no longer pending        pendingCreates.remove(src);        NameNode.stateChangeLog.debug(             "DIR* NameSystem.completeFile: " + src           + " is removed from pendingCreates");        for (int i = 0; i < nrBlocks; i++) {            pendingCreateBlocks.remove(pendingBlocks[i]);        }        synchronized (leases) {            Lease lease = (Lease) leases.get(holder);            if (lease != null) {                lease.completedCreate(src);                if (! lease.hasLocks()) {                    leases.remove(holder);                    sortedLeases.remove(lease);                }            }        }        //        // REMIND - mjc - this should be done only after we wait a few secs.        // The namenode isn't giving datanodes enough time to report the        // replicated blocks that are automatically done as part of a client        // write.        //        // Now that the file is real, we need to be sure to replicate        // the blocks.        for (int i = 0; i < nrBlocks; i++) {            TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);            if (containingNodes.size() < pendingFile.getReplication()) {                   NameNode.stateChangeLog.debug(                          "DIR* NameSystem.completeFile:"                        + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()                        +" replicas so is added to neededReplications");                           synchronized (neededReplications) {                    neededReplications.add(pendingBlocks[i]);                }            }        }        return COMPLETE_SUCCESS;    }    static Random randBlockId = new Random();        /**     * Allocate a block at the given pending filename     */    synchronized Block allocateBlock(UTF8 src) {        Block b = null;        do {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -