⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedcache.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.filecache;import org.apache.commons.logging.*;import java.io.*;import java.util.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.hadoop.fs.*;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;import java.net.URI;/******************************************************************************* * The DistributedCache maintains all the caching information of cached archives * and unarchives all the files as well and returns the path *  * @author Mahadev Konar ******************************************************************************/public class DistributedCache {  // cacheID to cacheStatus mapping  private static TreeMap cachedArchives = new TreeMap();  // buffer size for reading checksum files  private static final int CRC_BUFFER_SIZE = 64 * 1024;    /**   *    * @param cache the cache to be localized, this should be specified as    * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema    * or hostname:port is provided the file is assumed to be in the filesystem   * being used in the Configuration   * @param conf The Confguration file which contains the filesystem   * @param baseDir The base cache Dir where you wnat to localize the files/archives   * @param isArchive if the cache is an archive or a file. In case it is an archive   *  with a .zip or .jar extension it will be unzipped/unjarred automatically    *  and the directory where the archive is unjarred is returned as the Path.   *  In case of a file, the path to the file is returned   * @param md5 this is a mere checksum to verufy if you are using the right cache.    * You need to pass the md5 of the crc file in DFS. This is matched against the one   * calculated in this api and if it does not match, the cache is not localized.   * @return the path to directory where the archives are unjarred in case of archives,   * the path to the file where the file is copied locally    * @throws IOException   */  public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,      boolean isArchive, String md5) throws IOException {    String cacheId = makeRelative(cache, conf);    CacheStatus lcacheStatus;    Path localizedPath;    synchronized (cachedArchives) {      if (!cachedArchives.containsKey(cacheId)) {        // was never localized        lcacheStatus = new CacheStatus();        lcacheStatus.currentStatus = false;        lcacheStatus.refcount = 1;        lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId));        cachedArchives.put(cacheId, lcacheStatus);      } else {        lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);        synchronized (lcacheStatus) {          lcacheStatus.refcount++;        }      }    }    synchronized (lcacheStatus) {      localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5);    }    // try deleting stuff if you can    long size = FileUtil.getDU(new File(baseDir.toString()));    // setting the cache size to a default of 1MB    long allowedSize = conf.getLong("local.cache.size", 1048576L);    if (allowedSize < size) {      // try some cache deletions      deleteCache(conf);    }    return localizedPath;  }    /**   * This is the opposite of getlocalcache. When you are done with   * using the cache, you need to release the cache   * @param cache The cache URI to be released   * @param conf configuration which contains the filesystem the cache    * is contained in.   * @throws IOException   */  public static void releaseCache(URI cache, Configuration conf)      throws IOException {    String cacheId = makeRelative(cache, conf);    synchronized (cachedArchives) {      CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);      synchronized (lcacheStatus) {        lcacheStatus.refcount--;      }    }  }    // To delete the caches which have a refcount of zero    private static void deleteCache(Configuration conf) throws IOException {    // try deleting cache Status with refcount of zero    synchronized (cachedArchives) {      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {        String cacheId = (String) it.next();        CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);        if (lcacheStatus.refcount == 0) {          // delete this cache entry          FileSystem.getNamed("local", conf).delete(lcacheStatus.localLoadPath);          cachedArchives.remove(cacheId);        }      }    }  }  /*   * Returns the relative path of the dir this cache will be localized in   * relative path that this cache will be localized in. For   * dfs://hostname:port/absolute_path -- the relative path is   * hostname/absolute path -- if it is just /absolute_path -- then the   * relative path is hostname of DFS this mapred cluster is running   * on/absolute_path   */  private static String makeRelative(URI cache, Configuration conf)      throws IOException {    String fsname = cache.getScheme();    String path;    FileSystem dfs = FileSystem.get(conf);    if ("dfs".equals(fsname)) {      path = cache.getHost() + cache.getPath();    } else {      String[] split = dfs.getName().split(":");      path = split[0] + cache.getPath();    }    return path;  }  private static Path cacheFilePath(Path p) {    return new Path(p, p.getName());  }  // the methoed which actually copies the caches locally and unjars/unzips them  private static Path localizeCache(URI cache, CacheStatus cacheStatus,      Configuration conf, boolean isArchive, String md5) throws IOException {    boolean b = true;    FileSystem dfs = getFileSystem(cache, conf);    b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);    if (b) {      if (isArchive)        return cacheStatus.localLoadPath;      else        return cacheFilePath(cacheStatus.localLoadPath);    } else {      // remove the old archive      // if the old archive cannot be removed since it is being used by another      // job      // return null      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()            + " is in use and cannot be refreshed");      byte[] checkSum = createMD5(cache, conf);      FileSystem localFs = FileSystem.getNamed("local", conf);      localFs.delete(cacheStatus.localLoadPath);      Path parchive = new Path(cacheStatus.localLoadPath,                               new Path(cacheStatus.localLoadPath.getName()));      localFs.mkdirs(cacheStatus.localLoadPath);      String cacheId = cache.getPath();      dfs.copyToLocalFile(new Path(cacheId), parchive);      dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive          .toString()          + "_md5"));      if (isArchive) {        String tmpArchive = parchive.toString().toLowerCase();        if (tmpArchive.endsWith(".jar")) {          RunJar.unJar(new File(parchive.toString()), new File(parchive              .getParent().toString()));        } else if (tmpArchive.endsWith(".zip")) {          FileUtil.unZip(new File(parchive.toString()), new File(parchive              .getParent().toString()));        }        // else will not do anyhting        // and copy the file into the dir as it is      }      cacheStatus.currentStatus = true;      cacheStatus.md5 = checkSum;    }    if (isArchive)      return cacheStatus.localLoadPath;    else      return cacheFilePath(cacheStatus.localLoadPath);  }  // Checks if the cache has already been localized and is fresh  private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache,      FileSystem dfs, String confMD5, Configuration conf) throws IOException {    // compute the md5 of the crc    byte[] digest = null;    byte[] fsDigest = createMD5(cache, conf);    byte[] confDigest = StringUtils.hexStringToByte(confMD5);    // check for existence of the cache    if (lcacheStatus.currentStatus == false) {      return false;    } else {      digest = lcacheStatus.md5;      if (!MessageDigest.isEqual(confDigest, fsDigest)) {        throw new IOException("Inconsistencty in data caching, "            + "Cache archives have been changed");      } else {        if (!MessageDigest.isEqual(confDigest, digest)) {          // needs refreshing          return false;        } else {          // does not need any refreshing          return true;        }      }    }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -