⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 copyfiles.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  }    public static class HTTPCopyFilesMapper extends CopyFilesMapper   implements Mapper   {    private URI srcURI = null;    private FileSystem destFileSys = null;    private Path destPath = null;    private JobConf jobConf = null;    private boolean ignoreReadFailures;        /**     * Initialize HTTPCopyFileMapper specific job.     * @param conf : The dfs/mapred configuration.     * @param jobConf : The handle to the jobConf object to be initialized.     * @param srcPaths : The source URI.     * @param destPath : The destination URI.     * @param ignoreReadFailures : Ignore read failures?     */    public void setup(Configuration conf, JobConf jobConf,         String[] srcPaths, String destPath,         boolean ignoreReadFailures)     throws IOException    {      //Destination      URI destURI = null;      try {        destURI = new URI(destPath);      } catch (URISyntaxException ue) {        throw new IOException("Illegal destination path!");      }      String destFileSysName = getFileSysName(destURI);      jobConf.set("copy.dest.fs", destFileSysName);      destPath = destURI.getPath();      jobConf.set("copy.dest.path", destPath);            //Setup the MR-job configuration      jobConf.setSpeculativeExecution(false);            jobConf.setInputFormat(SequenceFileInputFormat.class);            jobConf.setOutputKeyClass(Text.class);      jobConf.setOutputValueClass(Text.class);      jobConf.setOutputFormat(SequenceFileOutputFormat.class);            jobConf.setMapperClass(HTTPCopyFilesMapper.class);      jobConf.setReducerClass(CopyFilesReducer.class);            // ideal number of maps is one per file (if the map-launching overhead      // were 0. It is limited by jobtrackers handling capacity, which lets say      // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for       // small files it is better to determine number of maps by       // amount of data per map.      int nFiles = srcPaths.length;      int numMaps = nFiles;      if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }      JobClient client = new JobClient(jobConf);      ClusterStatus cluster = client.getClusterStatus();      int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;      if (numMaps > tmpMaps) { numMaps = tmpMaps; }      if (numMaps == 0) { numMaps = 1; }      jobConf.setNumMapTasks(numMaps);            jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);            FileSystem fileSystem = FileSystem.get(conf);      Random r = new Random();      Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" +           Integer.toString(Math.abs(r.nextInt()), 36));      Path jobInputDir = new Path(jobDirectory, "in");      fileSystem.mkdirs(jobInputDir);      jobConf.setInputPath(jobInputDir);            jobConf.set("distcp.job.dir", jobDirectory.toString());      Path jobOutputDir = new Path(jobDirectory, "out");      jobConf.setOutputPath(jobOutputDir);            for(int i=0; i < srcPaths.length; ++i) {        Path ipFile = new Path(jobInputDir, "part" + i);        SequenceFile.Writer writer =           new SequenceFile.Writer(fileSystem, ipFile, Text.class, Text.class);        writer.append(new Text(srcPaths[i]), new Text(""));        writer.close();      }    }	        public void cleanup(Configuration conf, JobConf jobConf,         String srcPath, String destPath)     throws IOException    {      //Clean up jobDirectory      Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));      FileSystem fs = FileSystem.get(jobConf);            if(!jobDirectory.equals("/")) {        fs.delete(jobDirectory);      }    }        public void configure(JobConf job)    {      //Save jobConf      jobConf = job;            try {        //Destination        destFileSys =           FileSystem.getNamed(job.get("copy.dest.fs", "local"), job);        destPath = new Path(job.get("copy.dest.path", "/"));        if(!destFileSys.exists(destPath)) {          return;        }      } catch(IOException ioe) {        return;      }            ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);    }        public void map(WritableComparable key,        Writable val,        OutputCollector out,        Reporter reporter) throws IOException         {      //The url of the file      try {        srcURI = new URI(((Text)key).toString());                //Construct the complete destination path        File urlPath = new File(srcURI.getPath());        Path destinationPath = new Path(destPath, urlPath.getName());                //Copy the file         URL url = srcURI.toURL();        HttpURLConnection connection = (HttpURLConnection)url.openConnection();        connection.setRequestMethod("GET");        connection.connect();                int bufferSize = jobConf.getInt("io.file.buffer.size", 4096);        byte[] buffer = new byte[bufferSize];        BufferedInputStream is =           new BufferedInputStream(connection.getInputStream());                FSDataOutputStream os =           new FSDataOutputStream(destFileSys, destinationPath, true,               jobConf,	bufferSize, (short)jobConf.getInt("dfs.replication", 3),               jobConf.getLong("dfs.block.size", 67108864)          );                int readBytes = 0;        while((readBytes = is.read(buffer, 0, bufferSize)) != -1) {          os.write(buffer, 0, readBytes);        }                is.close();        os.close();        connection.disconnect();                reporter.setStatus("Copied: " + srcURI.toString() +             " to: " + destinationPath.toString());              } catch(Exception e) {        reporter.setStatus("Failed to copy from: " + (Text)key);        if(ignoreReadFailures) {          return;        } else {          throw new IOException("Failed to copy from: " + (Text)key);        }      }    }  }    /**   * Factory to create requisite Mapper objects for distcp.   * @author Arun C Murthy   */  private static class CopyMapperFactory  {    public static CopyFilesMapper getMapper(String protocol)    {      CopyFilesMapper mapper = null;            if("dfs".equals(protocol) || "file".equals(protocol)) {        mapper = new DFSCopyFilesMapper();      } else if("http".equals(protocol)) {        mapper = new HTTPCopyFilesMapper();      }            return mapper;    }  }    public static class CopyFilesReducer extends MapReduceBase implements Reducer {    public void reduce(WritableComparable key,        Iterator values,        OutputCollector output,        Reporter reporter) throws IOException {      // nothing    }  }    private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException  {    ArrayList uris = new ArrayList();    BufferedReader fis = null;        String srcListURIScheme = srcListURI.getScheme();    String srcListURIPath = srcListURI.getPath();        if("file".equals(srcListURIScheme)) {      fis = new BufferedReader(new FileReader(srcListURIPath));    } else if("dfs".equals(srcListURIScheme)) {      FileSystem fs = FileSystem.getNamed(CopyFilesMapper.getFileSysName(srcListURI), conf);      fis = new BufferedReader(          new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf))          );    } else if("http".equals(srcListURIScheme)) {      //Copy the file       URL url = srcListURI.toURL();      HttpURLConnection connection = (HttpURLConnection)url.openConnection();      connection.setRequestMethod("GET");      connection.connect();            fis = new BufferedReader(          new InputStreamReader(connection.getInputStream())          );    } else {      throw new IOException("Unsupported source list uri!");    }    String uri = null;    while((uri = fis.readLine()) != null) {      if(!uri.startsWith("#")) {        uris.add(uri);      }    }    fis.close();    if(!uris.isEmpty()) {      return (String[])uris.toArray(new String[0]);    }        return null;  }    /**   * Helper function to parse input file and return source urls for    * a given protocol.   * @param protocol : The protocol for which to find source urls.   * @param inputFilePath : The file containing the urls.   * @return   */  private static String[] parseInputFile(String protocol, String[] uris)  throws IOException  {    ArrayList protocolURIs = new ArrayList();        for(int i=0; i < uris.length; ++i) {      if(uris[i].startsWith(protocol)) {        protocolURIs.add(uris[i]);      }    }        if(!protocolURIs.isEmpty()) {      return (String[])protocolURIs.toArray(new String[0]);    }        return null;  }    /**   * Driver to copy srcPath to destPath depending on required protocol.   * @param conf : Configuration   * @param srcPath : Source path   * @param destPath : Destination path   */  public static void copy(Configuration conf, String srcPath, String destPath,      boolean srcAsList, boolean ignoreReadFailures)   throws IOException  {    //Job configuration    JobConf jobConf = new JobConf(conf, CopyFiles.class);    jobConf.setJobName("distcp");        //Sanity check for srcPath/destPath    URI srcURI = null;    try {        srcURI = new URI(srcPath);    } catch (URISyntaxException ex) {      throw new IOException("Illegal source path!");    }        URI destURI = null;    try {      destURI = new URI(destPath);    } catch (URISyntaxException ex) {      throw new IOException("Illegal destination path!");    }      //Source paths    String[] srcPaths = null;        if(srcAsList) {      srcPaths = fetchSrcURIs(conf, srcURI);    }        //Create the task-specific mapper     CopyFilesMapper mapper = null;    if(srcAsList) {      //Ugly?!            // Protocol - 'dfs://'      String[] dfsUrls = parseInputFile("dfs", srcPaths);      if(dfsUrls != null) {        for(int i=0; i < dfsUrls.length; ++i) {          copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);        }      }            // Protocol - 'file://'      String[] localUrls = parseInputFile("file", srcPaths);      if(localUrls != null) {        for(int i=0; i < localUrls.length; ++i) {          copy(conf, localUrls[i], destPath, false, ignoreReadFailures);        }      }            // Protocol - 'http://'      String[] httpUrls = parseInputFile("http", srcPaths);      if(httpUrls != null) {        srcPaths = httpUrls;        mapper = CopyMapperFactory.getMapper("http");      } else {        //Done        return;      }          } else {      //Single source - ugly!      String[] tmpSrcPath = {srcPath};      srcPaths = tmpSrcPath;      mapper = CopyMapperFactory.getMapper(srcURI.getScheme());    }        //Initialize the mapper    mapper.setup(conf, jobConf, srcPaths, destPath, ignoreReadFailures);        //We are good to go!    try {      JobClient.runJob(jobConf);    } finally {      mapper.cleanup(conf, jobConf, srcPath, destPath);    }      }    /**   * This is the main driver for recursively copying directories   * across file systems. It takes at least two cmdline parameters. A source   * URL and a destination URL. It then essentially does an "ls -lR" on the   * source URL, and writes the output in aa round-robin manner to all the map   * input files. The mapper actually copies the files allotted to it. And   * the reduce is empty.   */  public int run(String[] args) throws Exception {    String srcPath = null;    String destPath = null;    boolean ignoreReadFailures = false;    boolean srcAsList = false;        for (int idx = 0; idx < args.length; idx++) {      if ("-i".equals(args[idx])) {        ignoreReadFailures = true;      } else if ("-f".equals(args[idx])) {        srcAsList = true;      } else if (srcPath == null) {        srcPath = args[idx];      } else if (destPath == null) {        destPath = args[idx];      } else {        System.out.println(usage);        return -1;      }    }        // mandatory command-line parameters    if (srcPath == null || destPath == null) {      System.out.println(usage);      return -1;    }        try {      copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures);    } catch (Exception e) {      System.out.println("Caught: " + e);      return -1;    }        return 0;  }    public static void main(String[] args) throws Exception {    int res = new CopyFiles().doMain(        new JobConf(new Configuration(), CopyFiles.class),         args);    System.exit(res);  }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -