fsdirectory.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 760 行 · 第 1/2 页

JAVA
760
字号
                while (in.available() > 0) {                    byte opcode = in.readByte();                    numEdits++;                    switch (opcode) {                    case OP_ADD: {                        UTF8 name = new UTF8();                        name.readFields(in);                        ArrayWritable aw = new ArrayWritable(Block.class);                        aw.readFields(in);                        Writable writables[] = (Writable[]) aw.get();                        Block blocks[] = new Block[writables.length];                        System.arraycopy(writables, 0, blocks, 0, blocks.length);                        unprotectedAddFile(name, blocks);                        break;                    }                     case OP_RENAME: {                        UTF8 src = new UTF8();                        UTF8 dst = new UTF8();                        src.readFields(in);                        dst.readFields(in);                        unprotectedRenameTo(src, dst);                        break;                    }                    case OP_DELETE: {                        UTF8 src = new UTF8();                        src.readFields(in);                        unprotectedDelete(src);                        break;                    }                    case OP_MKDIR: {                        UTF8 src = new UTF8();                        src.readFields(in);                        unprotectedMkdir(src.toString());                        break;                    }                    default: {                        throw new IOException("Never seen opcode " + opcode);                    }                    }                }            } finally {                in.close();            }        }        return numEdits;    }    /**     * Save the contents of the FS image     */    void saveFSImage(File fullimage, File edits) throws IOException {        File curFile = new File(fullimage, FS_IMAGE);        File newFile = new File(fullimage, NEW_FS_IMAGE);        File oldFile = new File(fullimage, OLD_FS_IMAGE);        //        // Write out data        //        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));        try {            out.writeInt(rootDir.numItemsInTree() - 1);            rootDir.saveImage("", out);        } finally {            out.close();        }        //        // Atomic move sequence        //        // 1.  Move cur to old        curFile.renameTo(oldFile);                // 2.  Move new to cur        newFile.renameTo(curFile);        // 3.  Remove pending-edits file (it's been integrated with newFile)        edits.delete();                // 4.  Delete old        oldFile.delete();    }    /**     * Write an operation to the edit log     */    void logEdit(byte op, Writable w1, Writable w2) {        synchronized (editlog) {            try {                editlog.write(op);                if (w1 != null) {                    w1.write(editlog);                }                if (w2 != null) {                    w2.write(editlog);                }            } catch (IOException ie) {            }        }    }    /**     * Add the given filename to the fs.     */    public boolean addFile(UTF8 src, Block blocks[]) {        waitForReady();        // Always do an implicit mkdirs for parent directory tree        mkdirs(DFSFile.getDFSParent(src.toString()));        if (unprotectedAddFile(src, blocks)) {            logEdit(OP_ADD, src, new ArrayWritable(Block.class, blocks));            return true;        } else {            return false;        }    }        /**     */    boolean unprotectedAddFile(UTF8 name, Block blocks[]) {        synchronized (rootDir) {            if (blocks != null) {                // Add file->block mapping                for (int i = 0; i < blocks.length; i++) {                    activeBlocks.add(blocks[i]);                }            }            return (rootDir.addNode(name.toString(), blocks) != null);        }    }    /**     * Change the filename     */    public boolean renameTo(UTF8 src, UTF8 dst) {        waitForReady();        if (unprotectedRenameTo(src, dst)) {            logEdit(OP_RENAME, src, dst);            return true;        } else {            return false;        }    }    /**     */    boolean unprotectedRenameTo(UTF8 src, UTF8 dst) {        synchronized(rootDir) {            INode removedNode = rootDir.getNode(src.toString());            if (removedNode == null) {                return false;            }            removedNode.removeNode();            if (isDir(dst)) {                dst = new UTF8(dst.toString() + "/" + new File(src.toString()).getName());            }            INode newNode = rootDir.addNode(dst.toString(), removedNode.blocks);            if (newNode != null) {                newNode.children = removedNode.children;                for (Iterator it = newNode.children.values().iterator(); it.hasNext(); ) {                    INode child = (INode) it.next();                    child.parent = newNode;                }                return true;            } else {                rootDir.addNode(src.toString(), removedNode.blocks);                return false;            }        }    }    /**     * Remove the file from management, return blocks     */    public Block[] delete(UTF8 src) {        waitForReady();        logEdit(OP_DELETE, src, null);        return unprotectedDelete(src);    }    /**     */    Block[] unprotectedDelete(UTF8 src) {        synchronized (rootDir) {            INode targetNode = rootDir.getNode(src.toString());            if (targetNode == null) {                return null;            } else {                //                // Remove the node from the namespace and GC all                // the blocks underneath the node.                //                if (! targetNode.removeNode()) {                    return null;                } else {                    Vector v = new Vector();                    targetNode.collectSubtreeBlocks(v);                    for (Iterator it = v.iterator(); it.hasNext(); ) {                        Block b = (Block) it.next();                        activeBlocks.remove(b);                    }                    return (Block[]) v.toArray(new Block[v.size()]);                }            }        }    }    /**     */    public int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {        TreeSet holders = (TreeSet) activeLocks.get(src);        if (holders == null) {            holders = new TreeSet();            activeLocks.put(src, holders);        }        if (exclusive && holders.size() > 0) {            return STILL_WAITING;        } else {            holders.add(holder);            return COMPLETE_SUCCESS;        }    }    /**     */    public int releaseLock(UTF8 src, UTF8 holder) {        TreeSet holders = (TreeSet) activeLocks.get(src);        if (holders != null && holders.contains(holder)) {            holders.remove(holder);            if (holders.size() == 0) {                activeLocks.remove(src);            }            return COMPLETE_SUCCESS;        } else {            return OPERATION_FAILED;        }    }    /**     * Get a listing of files given path 'src'     *     * This function is admittedly very inefficient right now.  We'll     * make it better later.     */    public DFSFileInfo[] getListing(UTF8 src) {        String srcs = normalizePath(src);        synchronized (rootDir) {            INode targetNode = rootDir.getNode(srcs);            if (targetNode == null) {                return null;            } else {                Vector contents = new Vector();                targetNode.listContents(contents);                DFSFileInfo listing[] = new DFSFileInfo[contents.size()];                int i = 0;                for (Iterator it = contents.iterator(); it.hasNext(); i++) {                    listing[i] = new DFSFileInfo( (INode) it.next() );                }                return listing;            }        }    }    /**     * Get the blocks associated with the file     */    public Block[] getFile(UTF8 src) {        waitForReady();        synchronized (rootDir) {            INode targetNode = rootDir.getNode(src.toString());            if (targetNode == null) {                return null;            } else {                return targetNode.blocks;            }        }    }    /**      * Check whether the filepath could be created     */    public boolean isValidToCreate(UTF8 src) {        String srcs = normalizePath(src);        synchronized (rootDir) {            if (srcs.startsWith("/") &&                 ! srcs.endsWith("/") &&                 rootDir.getNode(srcs) == null) {                return true;            } else {                return false;            }        }    }    /**     * Check whether the path specifies a directory     */    public boolean isDir(UTF8 src) {        synchronized (rootDir) {            INode node = rootDir.getNode(normalizePath(src));            return node != null && node.isDir();        }    }    /**     * Create the given directory and all its parent dirs.     */    public boolean mkdirs(UTF8 src) {        return mkdirs(src.toString());    }    /**     * Create directory entries for every item     */    boolean mkdirs(String src) {        src = normalizePath(new UTF8(src));        // Use this to collect all the dirs we need to construct        Vector v = new Vector();        // The dir itself        v.add(src);        // All its parents        String parent = DFSFile.getDFSParent(src);        while (parent != null) {            v.add(parent);            parent = DFSFile.getDFSParent(parent);        }        // Now go backwards through list of dirs, creating along        // the way        boolean lastSuccess = false;        int numElts = v.size();        for (int i = numElts - 1; i >= 0; i--) {            String cur = (String) v.elementAt(i);            INode inserted = unprotectedMkdir(cur);            if (inserted != null) {                logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);                lastSuccess = true;            } else {                lastSuccess = false;            }        }        return lastSuccess;    }    /**     */    INode unprotectedMkdir(String src) {        synchronized (rootDir) {            return rootDir.addNode(src, null);        }    }    /**     */    String normalizePath(UTF8 src) {        String srcs = src.toString();        if (srcs.length() > 1 && srcs.endsWith("/")) {            srcs = srcs.substring(0, srcs.length() - 1);        }        return srcs;    }    /**     * Returns whether the given block is one pointed-to by a file.     */    public boolean isValidBlock(Block b) {        synchronized (rootDir) {            if (activeBlocks.contains(b)) {                return true;            } else {                return false;            }        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?