fsdirectory.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 760 行 · 第 1/2 页
JAVA
760 行
while (in.available() > 0) { byte opcode = in.readByte(); numEdits++; switch (opcode) { case OP_ADD: { UTF8 name = new UTF8(); name.readFields(in); ArrayWritable aw = new ArrayWritable(Block.class); aw.readFields(in); Writable writables[] = (Writable[]) aw.get(); Block blocks[] = new Block[writables.length]; System.arraycopy(writables, 0, blocks, 0, blocks.length); unprotectedAddFile(name, blocks); break; } case OP_RENAME: { UTF8 src = new UTF8(); UTF8 dst = new UTF8(); src.readFields(in); dst.readFields(in); unprotectedRenameTo(src, dst); break; } case OP_DELETE: { UTF8 src = new UTF8(); src.readFields(in); unprotectedDelete(src); break; } case OP_MKDIR: { UTF8 src = new UTF8(); src.readFields(in); unprotectedMkdir(src.toString()); break; } default: { throw new IOException("Never seen opcode " + opcode); } } } } finally { in.close(); } } return numEdits; } /** * Save the contents of the FS image */ void saveFSImage(File fullimage, File edits) throws IOException { File curFile = new File(fullimage, FS_IMAGE); File newFile = new File(fullimage, NEW_FS_IMAGE); File oldFile = new File(fullimage, OLD_FS_IMAGE); // // Write out data // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile))); try { out.writeInt(rootDir.numItemsInTree() - 1); rootDir.saveImage("", out); } finally { out.close(); } // // Atomic move sequence // // 1. Move cur to old curFile.renameTo(oldFile); // 2. Move new to cur newFile.renameTo(curFile); // 3. Remove pending-edits file (it's been integrated with newFile) edits.delete(); // 4. Delete old oldFile.delete(); } /** * Write an operation to the edit log */ void logEdit(byte op, Writable w1, Writable w2) { synchronized (editlog) { try { editlog.write(op); if (w1 != null) { w1.write(editlog); } if (w2 != null) { w2.write(editlog); } } catch (IOException ie) { } } } /** * Add the given filename to the fs. */ public boolean addFile(UTF8 src, Block blocks[]) { waitForReady(); // Always do an implicit mkdirs for parent directory tree mkdirs(DFSFile.getDFSParent(src.toString())); if (unprotectedAddFile(src, blocks)) { logEdit(OP_ADD, src, new ArrayWritable(Block.class, blocks)); return true; } else { return false; } } /** */ boolean unprotectedAddFile(UTF8 name, Block blocks[]) { synchronized (rootDir) { if (blocks != null) { // Add file->block mapping for (int i = 0; i < blocks.length; i++) { activeBlocks.add(blocks[i]); } } return (rootDir.addNode(name.toString(), blocks) != null); } } /** * Change the filename */ public boolean renameTo(UTF8 src, UTF8 dst) { waitForReady(); if (unprotectedRenameTo(src, dst)) { logEdit(OP_RENAME, src, dst); return true; } else { return false; } } /** */ boolean unprotectedRenameTo(UTF8 src, UTF8 dst) { synchronized(rootDir) { INode removedNode = rootDir.getNode(src.toString()); if (removedNode == null) { return false; } removedNode.removeNode(); if (isDir(dst)) { dst = new UTF8(dst.toString() + "/" + new File(src.toString()).getName()); } INode newNode = rootDir.addNode(dst.toString(), removedNode.blocks); if (newNode != null) { newNode.children = removedNode.children; for (Iterator it = newNode.children.values().iterator(); it.hasNext(); ) { INode child = (INode) it.next(); child.parent = newNode; } return true; } else { rootDir.addNode(src.toString(), removedNode.blocks); return false; } } } /** * Remove the file from management, return blocks */ public Block[] delete(UTF8 src) { waitForReady(); logEdit(OP_DELETE, src, null); return unprotectedDelete(src); } /** */ Block[] unprotectedDelete(UTF8 src) { synchronized (rootDir) { INode targetNode = rootDir.getNode(src.toString()); if (targetNode == null) { return null; } else { // // Remove the node from the namespace and GC all // the blocks underneath the node. // if (! targetNode.removeNode()) { return null; } else { Vector v = new Vector(); targetNode.collectSubtreeBlocks(v); for (Iterator it = v.iterator(); it.hasNext(); ) { Block b = (Block) it.next(); activeBlocks.remove(b); } return (Block[]) v.toArray(new Block[v.size()]); } } } } /** */ public int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) { TreeSet holders = (TreeSet) activeLocks.get(src); if (holders == null) { holders = new TreeSet(); activeLocks.put(src, holders); } if (exclusive && holders.size() > 0) { return STILL_WAITING; } else { holders.add(holder); return COMPLETE_SUCCESS; } } /** */ public int releaseLock(UTF8 src, UTF8 holder) { TreeSet holders = (TreeSet) activeLocks.get(src); if (holders != null && holders.contains(holder)) { holders.remove(holder); if (holders.size() == 0) { activeLocks.remove(src); } return COMPLETE_SUCCESS; } else { return OPERATION_FAILED; } } /** * Get a listing of files given path 'src' * * This function is admittedly very inefficient right now. We'll * make it better later. */ public DFSFileInfo[] getListing(UTF8 src) { String srcs = normalizePath(src); synchronized (rootDir) { INode targetNode = rootDir.getNode(srcs); if (targetNode == null) { return null; } else { Vector contents = new Vector(); targetNode.listContents(contents); DFSFileInfo listing[] = new DFSFileInfo[contents.size()]; int i = 0; for (Iterator it = contents.iterator(); it.hasNext(); i++) { listing[i] = new DFSFileInfo( (INode) it.next() ); } return listing; } } } /** * Get the blocks associated with the file */ public Block[] getFile(UTF8 src) { waitForReady(); synchronized (rootDir) { INode targetNode = rootDir.getNode(src.toString()); if (targetNode == null) { return null; } else { return targetNode.blocks; } } } /** * Check whether the filepath could be created */ public boolean isValidToCreate(UTF8 src) { String srcs = normalizePath(src); synchronized (rootDir) { if (srcs.startsWith("/") && ! srcs.endsWith("/") && rootDir.getNode(srcs) == null) { return true; } else { return false; } } } /** * Check whether the path specifies a directory */ public boolean isDir(UTF8 src) { synchronized (rootDir) { INode node = rootDir.getNode(normalizePath(src)); return node != null && node.isDir(); } } /** * Create the given directory and all its parent dirs. */ public boolean mkdirs(UTF8 src) { return mkdirs(src.toString()); } /** * Create directory entries for every item */ boolean mkdirs(String src) { src = normalizePath(new UTF8(src)); // Use this to collect all the dirs we need to construct Vector v = new Vector(); // The dir itself v.add(src); // All its parents String parent = DFSFile.getDFSParent(src); while (parent != null) { v.add(parent); parent = DFSFile.getDFSParent(parent); } // Now go backwards through list of dirs, creating along // the way boolean lastSuccess = false; int numElts = v.size(); for (int i = numElts - 1; i >= 0; i--) { String cur = (String) v.elementAt(i); INode inserted = unprotectedMkdir(cur); if (inserted != null) { logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null); lastSuccess = true; } else { lastSuccess = false; } } return lastSuccess; } /** */ INode unprotectedMkdir(String src) { synchronized (rootDir) { return rootDir.addNode(src, null); } } /** */ String normalizePath(UTF8 src) { String srcs = src.toString(); if (srcs.length() > 1 && srcs.endsWith("/")) { srcs = srcs.substring(0, srcs.length() - 1); } return srcs; } /** * Returns whether the given block is one pointed-to by a file. */ public boolean isValidBlock(Block b) { synchronized (rootDir) { if (activeBlocks.contains(b)) { return true; } else { return false; } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?