⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedcache.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  /**   * Returns md5 of the checksum file for a given dfs file.   * This method also creates file filename_md5 existence of which   * signifies a new cache has been loaded into dfs. So if you want to   * refresh the cache, you need to delete this md5 file as well.   * @param cache The cache to get the md5 checksum for   * @param conf configuration   * @return md5 of the crc of the cache parameter   * @throws IOException   */  public static byte[] createMD5(URI cache, Configuration conf)      throws IOException {    byte[] b = new byte[CRC_BUFFER_SIZE];    byte[] digest = null;    FileSystem fileSystem = getFileSystem(cache, conf);    String filename = cache.getPath();    Path filePath = new Path(filename);    Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR        + filePath.getName() + "_md5");    MessageDigest md5 = null;    try {      md5 = MessageDigest.getInstance("MD5");    } catch (NoSuchAlgorithmException na) {      // do nothing    }    if (!fileSystem.exists(md5File)) {      FSInputStream fsStream = fileSystem.openRaw(FileSystem          .getChecksumFile(filePath));      int read = fsStream.read(b);      while (read != -1) {        md5.update(b, 0, read);        read = fsStream.read(b);      }      fsStream.close();      digest = md5.digest();      FSDataOutputStream out = fileSystem.create(md5File);      out.write(digest);      out.close();    } else {      FSInputStream fsStream = fileSystem.openRaw(md5File);      digest = new byte[md5.getDigestLength()];      // assuming reading 16 bytes once is not a problem      // though it should be checked if 16 bytes have been read or not      int read = fsStream.read(digest);      fsStream.close();    }    return digest;  }  private static String getFileSysName(URI url) {    String fsname = url.getScheme();    if ("dfs".equals(fsname)) {      String host = url.getHost();      int port = url.getPort();      return (port == (-1)) ? host : (host + ":" + port);    } else {      return null;    }  }  private static FileSystem getFileSystem(URI cache, Configuration conf)      throws IOException {    String fileSysName = getFileSysName(cache);    if (fileSysName != null)      return FileSystem.getNamed(fileSysName, conf);    else      return FileSystem.get(conf);  }  /**   * Set the configuration with the given set of archives   * @param archives The list of archives that need to be localized   * @param conf Configuration which will be changed   */  public static void setCacheArchives(URI[] archives, Configuration conf) {    String sarchives = StringUtils.uriToString(archives);    conf.set("mapred.cache.archives", sarchives);  }  /**   * Set the configuration with the given set of files   * @param files The list of files that need to be localized   * @param conf Configuration which will be changed   */  public static void setCacheFiles(URI[] files, Configuration conf) {    String sfiles = StringUtils.uriToString(files);    conf.set("mapred.cache.files", sfiles);  }  /**   * Get cache archives set in the Configuration   * @param conf The configuration which contains the archives   * @return A URI array of the caches set in the Configuration   * @throws IOException   */  public static URI[] getCacheArchives(Configuration conf) throws IOException {    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));  }  /**   * Get cache files set in the Configuration   * @param conf The configuration which contains the files   * @return A URI array of the files set in the Configuration   * @throws IOException   */  public static URI[] getCacheFiles(Configuration conf) throws IOException {    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));  }  /**   * Return the path array of the localized caches   * @param conf Configuration that contains the localized archives   * @return A path array of localized caches   * @throws IOException   */  public static Path[] getLocalCacheArchives(Configuration conf)      throws IOException {    return StringUtils.stringToPath(conf        .getStrings("mapred.cache.localArchives"));  }  /**   * Return the path array of the localized files   * @param conf Configuration that contains the localized files   * @return A path array of localized files   * @throws IOException   */  public static Path[] getLocalCacheFiles(Configuration conf)      throws IOException {    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));  }  /**   * Get the md5 checksums of the archives   * @param conf The configuration which stored the md5's   * @return a string array of md5 checksums    * @throws IOException   */  public static String[] getArchiveMd5(Configuration conf) throws IOException {    return conf.getStrings("mapred.cache.archivemd5");  }  /**   * Get the md5 checksums of the files   * @param conf The configuration which stored the md5's   * @return a string array of md5 checksums    * @throws IOException   */  public static String[] getFileMd5(Configuration conf) throws IOException {    return conf.getStrings("mapred.cache.filemd5");  }  /**   * This is to check the md5 of the archives to be localized   * @param conf Configuration which stores the md5's   * @param md5 comma seperated list of md5 checksums of the .crc's of archives.   * The order should be the same as the order in which the archives are added   */  public static void setArchiveMd5(Configuration conf, String md5) {    conf.set("mapred.cache.archivemd5", md5);  }  /**   * This is to check the md5 of the files to be localized   * @param conf Configuration which stores the md5's   * @param md5 comma seperated list of md5 checksums of the .crc's of files.   * The order should be the same as the order in which the files are added   */  public static void setFileMd5(Configuration conf, String md5) {    conf.set("mapred.cache.filemd5", md5);  }    /**   * Set the conf to contain the location for localized archives    * @param conf The conf to modify to contain the localized caches   * @param str a comma seperated list of local archives   */  public static void setLocalArchives(Configuration conf, String str) {    conf.set("mapred.cache.localArchives", str);  }  /**   * Set the conf to contain the location for localized files    * @param conf The conf to modify to contain the localized caches   * @param str a comma seperated list of local files   */  public static void setLocalFiles(Configuration conf, String str) {    conf.set("mapred.cache.localFiles", str);  }  /**   * Add a archives to be localized to the conf   * @param uri The uri of the cache to be localized   * @param conf Configuration to add the cache to   */  public static void addCacheArchive(URI uri, Configuration conf) {    String archives = conf.get("mapred.cache.archives");    conf.set("mapred.cache.archives", archives == null ? uri.toString()        : archives + "," + uri.toString());  }    /**   * Add a file to be localized to the conf   * @param uri The uri of the cache to be localized   * @param conf Configuration to add the cache to   */  public static void addCacheFile(URI uri, Configuration conf) {    String files = conf.get("mapred.cache.files");    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","        + uri.toString());  }  private static class CacheStatus {    // false, not loaded yet, true is loaded    public boolean currentStatus;    // the local load path of this cache    public Path localLoadPath;    // number of instances using this cache    public int refcount;    // The md5 checksum of the crc file of this cache    public byte[] md5;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -