📄 distributedcache.java
字号:
/** * Returns md5 of the checksum file for a given dfs file. * This method also creates file filename_md5 existence of which * signifies a new cache has been loaded into dfs. So if you want to * refresh the cache, you need to delete this md5 file as well. * @param cache The cache to get the md5 checksum for * @param conf configuration * @return md5 of the crc of the cache parameter * @throws IOException */ public static byte[] createMD5(URI cache, Configuration conf) throws IOException { byte[] b = new byte[CRC_BUFFER_SIZE]; byte[] digest = null; FileSystem fileSystem = getFileSystem(cache, conf); String filename = cache.getPath(); Path filePath = new Path(filename); Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR + filePath.getName() + "_md5"); MessageDigest md5 = null; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException na) { // do nothing } if (!fileSystem.exists(md5File)) { FSInputStream fsStream = fileSystem.openRaw(FileSystem .getChecksumFile(filePath)); int read = fsStream.read(b); while (read != -1) { md5.update(b, 0, read); read = fsStream.read(b); } fsStream.close(); digest = md5.digest(); FSDataOutputStream out = fileSystem.create(md5File); out.write(digest); out.close(); } else { FSInputStream fsStream = fileSystem.openRaw(md5File); digest = new byte[md5.getDigestLength()]; // assuming reading 16 bytes once is not a problem // though it should be checked if 16 bytes have been read or not int read = fsStream.read(digest); fsStream.close(); } return digest; } private static String getFileSysName(URI url) { String fsname = url.getScheme(); if ("dfs".equals(fsname)) { String host = url.getHost(); int port = url.getPort(); return (port == (-1)) ? host : (host + ":" + port); } else { return null; } } private static FileSystem getFileSystem(URI cache, Configuration conf) throws IOException { String fileSysName = getFileSysName(cache); if (fileSysName != null) return FileSystem.getNamed(fileSysName, conf); else return FileSystem.get(conf); } /** * Set the configuration with the given set of archives * @param archives The list of archives that need to be localized * @param conf Configuration which will be changed */ public static void setCacheArchives(URI[] archives, Configuration conf) { String sarchives = StringUtils.uriToString(archives); conf.set("mapred.cache.archives", sarchives); } /** * Set the configuration with the given set of files * @param files The list of files that need to be localized * @param conf Configuration which will be changed */ public static void setCacheFiles(URI[] files, Configuration conf) { String sfiles = StringUtils.uriToString(files); conf.set("mapred.cache.files", sfiles); } /** * Get cache archives set in the Configuration * @param conf The configuration which contains the archives * @return A URI array of the caches set in the Configuration * @throws IOException */ public static URI[] getCacheArchives(Configuration conf) throws IOException { return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives")); } /** * Get cache files set in the Configuration * @param conf The configuration which contains the files * @return A URI array of the files set in the Configuration * @throws IOException */ public static URI[] getCacheFiles(Configuration conf) throws IOException { return StringUtils.stringToURI(conf.getStrings("mapred.cache.files")); } /** * Return the path array of the localized caches * @param conf Configuration that contains the localized archives * @return A path array of localized caches * @throws IOException */ public static Path[] getLocalCacheArchives(Configuration conf) throws IOException { return StringUtils.stringToPath(conf .getStrings("mapred.cache.localArchives")); } /** * Return the path array of the localized files * @param conf Configuration that contains the localized files * @return A path array of localized files * @throws IOException */ public static Path[] getLocalCacheFiles(Configuration conf) throws IOException { return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles")); } /** * Get the md5 checksums of the archives * @param conf The configuration which stored the md5's * @return a string array of md5 checksums * @throws IOException */ public static String[] getArchiveMd5(Configuration conf) throws IOException { return conf.getStrings("mapred.cache.archivemd5"); } /** * Get the md5 checksums of the files * @param conf The configuration which stored the md5's * @return a string array of md5 checksums * @throws IOException */ public static String[] getFileMd5(Configuration conf) throws IOException { return conf.getStrings("mapred.cache.filemd5"); } /** * This is to check the md5 of the archives to be localized * @param conf Configuration which stores the md5's * @param md5 comma seperated list of md5 checksums of the .crc's of archives. * The order should be the same as the order in which the archives are added */ public static void setArchiveMd5(Configuration conf, String md5) { conf.set("mapred.cache.archivemd5", md5); } /** * This is to check the md5 of the files to be localized * @param conf Configuration which stores the md5's * @param md5 comma seperated list of md5 checksums of the .crc's of files. * The order should be the same as the order in which the files are added */ public static void setFileMd5(Configuration conf, String md5) { conf.set("mapred.cache.filemd5", md5); } /** * Set the conf to contain the location for localized archives * @param conf The conf to modify to contain the localized caches * @param str a comma seperated list of local archives */ public static void setLocalArchives(Configuration conf, String str) { conf.set("mapred.cache.localArchives", str); } /** * Set the conf to contain the location for localized files * @param conf The conf to modify to contain the localized caches * @param str a comma seperated list of local files */ public static void setLocalFiles(Configuration conf, String str) { conf.set("mapred.cache.localFiles", str); } /** * Add a archives to be localized to the conf * @param uri The uri of the cache to be localized * @param conf Configuration to add the cache to */ public static void addCacheArchive(URI uri, Configuration conf) { String archives = conf.get("mapred.cache.archives"); conf.set("mapred.cache.archives", archives == null ? uri.toString() : archives + "," + uri.toString()); } /** * Add a file to be localized to the conf * @param uri The uri of the cache to be localized * @param conf Configuration to add the cache to */ public static void addCacheFile(URI uri, Configuration conf) { String files = conf.get("mapred.cache.files"); conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," + uri.toString()); } private static class CacheStatus { // false, not loaded yet, true is loaded public boolean currentStatus; // the local load path of this cache public Path localLoadPath; // number of instances using this cache public int refcount; // The md5 checksum of the crc file of this cache public byte[] md5; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -