📄 distributedcache.java
字号:
/* Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.filecache;import org.apache.commons.logging.*;import java.io.*;import java.util.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.hadoop.fs.*;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;import java.net.URI;/******************************************************************************* * The DistributedCache maintains all the caching information of cached archives * and unarchives all the files as well and returns the path * * @author Mahadev Konar ******************************************************************************/public class DistributedCache { // cacheID to cacheStatus mapping private static TreeMap cachedArchives = new TreeMap(); // buffer size for reading checksum files private static final int CRC_BUFFER_SIZE = 64 * 1024; /** * * @param cache the cache to be localized, this should be specified as * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema * or hostname:port is provided the file is assumed to be in the filesystem * being used in the Configuration * @param conf The Confguration file which contains the filesystem * @param baseDir The base cache Dir where you wnat to localize the files/archives * @param isArchive if the cache is an archive or a file. In case it is an archive * with a .zip or .jar extension it will be unzipped/unjarred automatically * and the directory where the archive is unjarred is returned as the Path. * In case of a file, the path to the file is returned * @param md5 this is a mere checksum to verufy if you are using the right cache. * You need to pass the md5 of the crc file in DFS. This is matched against the one * calculated in this api and if it does not match, the cache is not localized. * @return the path to directory where the archives are unjarred in case of archives, * the path to the file where the file is copied locally * @throws IOException */ public static Path getLocalCache(URI cache, Configuration conf, Path baseDir, boolean isArchive, String md5) throws IOException { String cacheId = makeRelative(cache, conf); CacheStatus lcacheStatus; Path localizedPath; synchronized (cachedArchives) { if (!cachedArchives.containsKey(cacheId)) { // was never localized lcacheStatus = new CacheStatus(); lcacheStatus.currentStatus = false; lcacheStatus.refcount = 1; lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId)); cachedArchives.put(cacheId, lcacheStatus); } else { lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); synchronized (lcacheStatus) { lcacheStatus.refcount++; } } } synchronized (lcacheStatus) { localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5); } // try deleting stuff if you can long size = FileUtil.getDU(new File(baseDir.toString())); // setting the cache size to a default of 1MB long allowedSize = conf.getLong("local.cache.size", 1048576L); if (allowedSize < size) { // try some cache deletions deleteCache(conf); } return localizedPath; } /** * This is the opposite of getlocalcache. When you are done with * using the cache, you need to release the cache * @param cache The cache URI to be released * @param conf configuration which contains the filesystem the cache * is contained in. * @throws IOException */ public static void releaseCache(URI cache, Configuration conf) throws IOException { String cacheId = makeRelative(cache, conf); synchronized (cachedArchives) { CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); synchronized (lcacheStatus) { lcacheStatus.refcount--; } } } // To delete the caches which have a refcount of zero private static void deleteCache(Configuration conf) throws IOException { // try deleting cache Status with refcount of zero synchronized (cachedArchives) { for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) { String cacheId = (String) it.next(); CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); if (lcacheStatus.refcount == 0) { // delete this cache entry FileSystem.getNamed("local", conf).delete(lcacheStatus.localLoadPath); cachedArchives.remove(cacheId); } } } } /* * Returns the relative path of the dir this cache will be localized in * relative path that this cache will be localized in. For * dfs://hostname:port/absolute_path -- the relative path is * hostname/absolute path -- if it is just /absolute_path -- then the * relative path is hostname of DFS this mapred cluster is running * on/absolute_path */ private static String makeRelative(URI cache, Configuration conf) throws IOException { String fsname = cache.getScheme(); String path; FileSystem dfs = FileSystem.get(conf); if ("dfs".equals(fsname)) { path = cache.getHost() + cache.getPath(); } else { String[] split = dfs.getName().split(":"); path = split[0] + cache.getPath(); } return path; } private static Path cacheFilePath(Path p) { return new Path(p, p.getName()); } // the methoed which actually copies the caches locally and unjars/unzips them private static Path localizeCache(URI cache, CacheStatus cacheStatus, Configuration conf, boolean isArchive, String md5) throws IOException { boolean b = true; FileSystem dfs = getFileSystem(cache, conf); b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf); if (b) { if (isArchive) return cacheStatus.localLoadPath; else return cacheFilePath(cacheStatus.localLoadPath); } else { // remove the old archive // if the old archive cannot be removed since it is being used by another // job // return null if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true)) throw new IOException("Cache " + cacheStatus.localLoadPath.toString() + " is in use and cannot be refreshed"); byte[] checkSum = createMD5(cache, conf); FileSystem localFs = FileSystem.getNamed("local", conf); localFs.delete(cacheStatus.localLoadPath); Path parchive = new Path(cacheStatus.localLoadPath, new Path(cacheStatus.localLoadPath.getName())); localFs.mkdirs(cacheStatus.localLoadPath); String cacheId = cache.getPath(); dfs.copyToLocalFile(new Path(cacheId), parchive); dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive .toString() + "_md5")); if (isArchive) { String tmpArchive = parchive.toString().toLowerCase(); if (tmpArchive.endsWith(".jar")) { RunJar.unJar(new File(parchive.toString()), new File(parchive .getParent().toString())); } else if (tmpArchive.endsWith(".zip")) { FileUtil.unZip(new File(parchive.toString()), new File(parchive .getParent().toString())); } // else will not do anyhting // and copy the file into the dir as it is } cacheStatus.currentStatus = true; cacheStatus.md5 = checkSum; } if (isArchive) return cacheStatus.localLoadPath; else return cacheFilePath(cacheStatus.localLoadPath); } // Checks if the cache has already been localized and is fresh private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache, FileSystem dfs, String confMD5, Configuration conf) throws IOException { // compute the md5 of the crc byte[] digest = null; byte[] fsDigest = createMD5(cache, conf); byte[] confDigest = StringUtils.hexStringToByte(confMD5); // check for existence of the cache if (lcacheStatus.currentStatus == false) { return false; } else { digest = lcacheStatus.md5; if (!MessageDigest.isEqual(confDigest, fsDigest)) { throw new IOException("Inconsistencty in data caching, " + "Cache archives have been changed"); } else { if (!MessageDigest.isEqual(confDigest, digest)) { // needs refreshing return false; } else { // does not need any refreshing return true; } } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -