fsnamesystem.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 1,340 行 · 第 1/4 页

JAVA
1,340
字号
    }    /**     * Abandon the entire file in progress     */    public synchronized void abandonFileInProgress(UTF8 src) throws IOException {        internalReleaseCreate(src);    }    /**     * Finalize the created file and make it world-accessible.  The     * FSNamesystem will already know the blocks that make up the file.     * Before we return, we make sure that all the file's blocks have      * been reported by datanodes and are replicated correctly.     */    public synchronized int completeFile(UTF8 src, UTF8 holder) {        if (dir.getFile(src) != null || pendingCreates.get(src) == null) {	    LOG.info("Failed to complete " + src + "  because dir.getFile()==" + dir.getFile(src) + " and " + pendingCreates.get(src));            return OPERATION_FAILED;        } else if (! checkFileProgress(src)) {            return STILL_WAITING;        } else {            Vector pendingVector = (Vector) pendingCreates.get(src);            Block pendingBlocks[] = (Block[]) pendingVector.toArray(new Block[pendingVector.size()]);            //            // We have the pending blocks, but they won't have            // length info in them (as they were allocated before            // data-write took place).  So we need to add the correct            // length info to each            //            // REMIND - mjc - this is very inefficient!  We should            // improve this!            //            for (int i = 0; i < pendingBlocks.length; i++) {                Block b = pendingBlocks[i];                TreeSet containingNodes = (TreeSet) blocksMap.get(b);                DatanodeInfo node = (DatanodeInfo) containingNodes.first();                for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {                    Block cur = (Block) it.next();                    if (b.getBlockId() == cur.getBlockId()) {                        b.setNumBytes(cur.getNumBytes());                        break;                    }                }            }                        //            // Now we can add the (name,blocks) tuple to the filesystem            //            if (dir.addFile(src, pendingBlocks)) {                // The file is no longer pending                pendingCreates.remove(src);                for (int i = 0; i < pendingBlocks.length; i++) {                    pendingCreateBlocks.remove(pendingBlocks[i]);                }                synchronized (leases) {                    Lease lease = (Lease) leases.get(holder);                    if (lease != null) {                        lease.completedCreate(src);                        if (! lease.hasLocks()) {                            leases.remove(holder);                            sortedLeases.remove(lease);                        }                    }                }                //                // REMIND - mjc - this should be done only after we wait a few secs.                // The namenode isn't giving datanodes enough time to report the                // replicated blocks that are automatically done as part of a client                // write.                //                // Now that the file is real, we need to be sure to replicate                // the blocks.                for (int i = 0; i < pendingBlocks.length; i++) {                    TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);                    if (containingNodes.size() < this.desiredReplication) {                        synchronized (neededReplications) {                            LOG.info("Completed file " + src + ", at holder " + holder + ".  There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + this.desiredReplication);                            neededReplications.add(pendingBlocks[i]);                        }                    }                }                return COMPLETE_SUCCESS;            } else {                System.out.println("AddFile() for " + src + " failed");            }	    LOG.info("Dropped through on file add....");        }        return OPERATION_FAILED;    }    /**     * Allocate a block at the given pending filename     */    synchronized Block allocateBlock(UTF8 src) {        Block b = new Block();        Vector v = (Vector) pendingCreates.get(src);        v.add(b);        pendingCreateBlocks.add(b);        return b;    }    /**     * Check that the indicated file's blocks are present and     * replicated.  If not, return false.     */    synchronized boolean checkFileProgress(UTF8 src) {        Vector v = (Vector) pendingCreates.get(src);        for (Iterator it = v.iterator(); it.hasNext(); ) {            Block b = (Block) it.next();            TreeSet containingNodes = (TreeSet) blocksMap.get(b);            if (containingNodes == null || containingNodes.size() < this.minReplication) {                return false;            }        }        return true;    }    ////////////////////////////////////////////////////////////////    // Here's how to handle block-copy failure during client write:    // -- As usual, the client's write should result in a streaming    // backup write to a k-machine sequence.    // -- If one of the backup machines fails, no worries.  Fail silently.    // -- Before client is allowed to close and finalize file, make sure    // that the blocks are backed up.  Namenode may have to issue specific backup    // commands to make up for earlier datanode failures.  Once all copies    // are made, edit namespace and return to client.    ////////////////////////////////////////////////////////////////    /**     * Change the indicated filename.     */    public boolean renameTo(UTF8 src, UTF8 dst) {        return dir.renameTo(src, dst);    }    /**     * Remove the indicated filename from the namespace.  This may     * invalidate some blocks that make up the file.     */    public synchronized boolean delete(UTF8 src) {        Block deletedBlocks[] = (Block[]) dir.delete(src);        if (deletedBlocks != null) {            for (int i = 0; i < deletedBlocks.length; i++) {                Block b = deletedBlocks[i];                TreeSet containingNodes = (TreeSet) blocksMap.get(b);                if (containingNodes != null) {                    for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {                        DatanodeInfo node = (DatanodeInfo) it.next();                        Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getName());                        if (invalidateSet == null) {                            invalidateSet = new Vector();                            recentInvalidateSets.put(node.getName(), invalidateSet);                        }                        invalidateSet.add(b);                    }                }            }        }        return (deletedBlocks != null);    }    /**     * Return whether the given filename exists     */    public boolean exists(UTF8 src) {        if (dir.getFile(src) != null || dir.isDir(src)) {            return true;        } else {            return false;        }    }    /**     * Whether the given name is a directory     */    public boolean isDir(UTF8 src) {        return dir.isDir(src);    }    /**     * Create all the necessary directories     */    public boolean mkdirs(UTF8 src) {        return dir.mkdirs(src);    }    /**     * Figure out a few hosts that are likely to contain the     * block(s) referred to by the given (filename, start, len) tuple.     */    public UTF8[][] getDatanodeHints(UTF8 src, long start, long len) {        if (start < 0 || len < 0) {            return new UTF8[0][];        }        int startBlock = -1;        int endBlock = -1;        Block blocks[] = dir.getFile(src);        if (blocks == null) {                     // no blocks            return new UTF8[0][];        }        //        // First, figure out where the range falls in        // the blocklist.        //        long startpos = start;        long endpos = start + len;        for (int i = 0; i < blocks.length; i++) {            if (startpos >= 0) {                startpos -= blocks[i].getNumBytes();                if (startpos <= 0) {                    startBlock = i;                }            }            if (endpos >= 0) {                endpos -= blocks[i].getNumBytes();                if (endpos <= 0) {                    endBlock = i;                    break;                }            }        }        //        // Next, create an array of hosts where each block can        // be found        //        if (startBlock < 0 || endBlock < 0) {            return new UTF8[0][];        } else {            UTF8 hosts[][] = new UTF8[(endBlock - startBlock) + 1][];            for (int i = startBlock; i <= endBlock; i++) {                TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]);                Vector v = new Vector();                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {                    DatanodeInfo cur = (DatanodeInfo) it.next();                    v.add(cur.getHost());                }                hosts[i-startBlock] = (UTF8[]) v.toArray(new UTF8[v.size()]);            }            return hosts;        }    }    /************************************************************     * A Lease governs all the locks held by a single client.     * For each client there's a corresponding lease, whose     * timestamp is updated when the client periodically     * checks in.  If the client dies and allows its lease to     * expire, all the corresponding locks can be released.     *************************************************************/    class Lease implements Comparable {        public UTF8 holder;        public long lastUpdate;        TreeSet locks = new TreeSet();        TreeSet creates = new TreeSet();        public Lease(UTF8 holder) {            this.holder = holder;            renew();        }        public void renew() {            this.lastUpdate = System.currentTimeMillis();        }        public boolean expired() {            if (System.currentTimeMillis() - lastUpdate > LEASE_PERIOD) {                return true;            } else {                return false;            }        }        public void obtained(UTF8 src) {            locks.add(src);        }        public void released(UTF8 src) {            locks.remove(src);        }        public void startedCreate(UTF8 src) {            creates.add(src);        }        public void completedCreate(UTF8 src) {            creates.remove(src);        }        public boolean hasLocks() {            return (locks.size() + creates.size()) > 0;        }        public void releaseLocks() {            for (Iterator it = locks.iterator(); it.hasNext(); ) {                UTF8 src = (UTF8) it.next();                internalReleaseLock(src, holder);            }            locks.clear();            for (Iterator it = creates.iterator(); it.hasNext(); ) {                UTF8 src = (UTF8) it.next();                internalReleaseCreate(src);            }            creates.clear();        }        /**         */        public String toString() {            return "[Lease.  Holder: " + holder.toString() + ", heldlocks: " + locks.size() + ", pendingcreates: " + creates.size() + "]";        }        /**         */        public int compareTo(Object o) {            Lease l1 = (Lease) this;            Lease l2 = (Lease) o;            long lu1 = l1.lastUpdate;            long lu2 = l2.lastUpdate;            if (lu1 < lu2) {                return -1;            } else if (lu1 > lu2) {                return 1;            } else {                return l1.holder.compareTo(l2.holder);            }        }    }    /******************************************************     * LeaseMonitor checks for leases that have expired,     * and disposes of them.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?