📄 copyfiles.java
字号:
} public static class HTTPCopyFilesMapper extends CopyFilesMapper implements Mapper { private URI srcURI = null; private FileSystem destFileSys = null; private Path destPath = null; private JobConf jobConf = null; private boolean ignoreReadFailures; /** * Initialize HTTPCopyFileMapper specific job. * @param conf : The dfs/mapred configuration. * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPaths : The source URI. * @param destPath : The destination URI. * @param ignoreReadFailures : Ignore read failures? */ public void setup(Configuration conf, JobConf jobConf, String[] srcPaths, String destPath, boolean ignoreReadFailures) throws IOException { //Destination URI destURI = null; try { destURI = new URI(destPath); } catch (URISyntaxException ue) { throw new IOException("Illegal destination path!"); } String destFileSysName = getFileSysName(destURI); jobConf.set("copy.dest.fs", destFileSysName); destPath = destURI.getPath(); jobConf.set("copy.dest.path", destPath); //Setup the MR-job configuration jobConf.setSpeculativeExecution(false); jobConf.setInputFormat(SequenceFileInputFormat.class); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(Text.class); jobConf.setOutputFormat(SequenceFileOutputFormat.class); jobConf.setMapperClass(HTTPCopyFilesMapper.class); jobConf.setReducerClass(CopyFilesReducer.class); // ideal number of maps is one per file (if the map-launching overhead // were 0. It is limited by jobtrackers handling capacity, which lets say // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for // small files it is better to determine number of maps by // amount of data per map. int nFiles = srcPaths.length; int numMaps = nFiles; if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; } JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE; if (numMaps > tmpMaps) { numMaps = tmpMaps; } if (numMaps == 0) { numMaps = 1; } jobConf.setNumMapTasks(numMaps); jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures); FileSystem fileSystem = FileSystem.get(conf); Random r = new Random(); Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path jobInputDir = new Path(jobDirectory, "in"); fileSystem.mkdirs(jobInputDir); jobConf.setInputPath(jobInputDir); jobConf.set("distcp.job.dir", jobDirectory.toString()); Path jobOutputDir = new Path(jobDirectory, "out"); jobConf.setOutputPath(jobOutputDir); for(int i=0; i < srcPaths.length; ++i) { Path ipFile = new Path(jobInputDir, "part" + i); SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem, ipFile, Text.class, Text.class); writer.append(new Text(srcPaths[i]), new Text("")); writer.close(); } } public void cleanup(Configuration conf, JobConf jobConf, String srcPath, String destPath) throws IOException { //Clean up jobDirectory Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/")); FileSystem fs = FileSystem.get(jobConf); if(!jobDirectory.equals("/")) { fs.delete(jobDirectory); } } public void configure(JobConf job) { //Save jobConf jobConf = job; try { //Destination destFileSys = FileSystem.getNamed(job.get("copy.dest.fs", "local"), job); destPath = new Path(job.get("copy.dest.path", "/")); if(!destFileSys.exists(destPath)) { return; } } catch(IOException ioe) { return; } ignoreReadFailures = job.getBoolean(readFailuresAttribute, false); } public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { //The url of the file try { srcURI = new URI(((Text)key).toString()); //Construct the complete destination path File urlPath = new File(srcURI.getPath()); Path destinationPath = new Path(destPath, urlPath.getName()); //Copy the file URL url = srcURI.toURL(); HttpURLConnection connection = (HttpURLConnection)url.openConnection(); connection.setRequestMethod("GET"); connection.connect(); int bufferSize = jobConf.getInt("io.file.buffer.size", 4096); byte[] buffer = new byte[bufferSize]; BufferedInputStream is = new BufferedInputStream(connection.getInputStream()); FSDataOutputStream os = new FSDataOutputStream(destFileSys, destinationPath, true, jobConf, bufferSize, (short)jobConf.getInt("dfs.replication", 3), jobConf.getLong("dfs.block.size", 67108864) ); int readBytes = 0; while((readBytes = is.read(buffer, 0, bufferSize)) != -1) { os.write(buffer, 0, readBytes); } is.close(); os.close(); connection.disconnect(); reporter.setStatus("Copied: " + srcURI.toString() + " to: " + destinationPath.toString()); } catch(Exception e) { reporter.setStatus("Failed to copy from: " + (Text)key); if(ignoreReadFailures) { return; } else { throw new IOException("Failed to copy from: " + (Text)key); } } } } /** * Factory to create requisite Mapper objects for distcp. * @author Arun C Murthy */ private static class CopyMapperFactory { public static CopyFilesMapper getMapper(String protocol) { CopyFilesMapper mapper = null; if("dfs".equals(protocol) || "file".equals(protocol)) { mapper = new DFSCopyFilesMapper(); } else if("http".equals(protocol)) { mapper = new HTTPCopyFilesMapper(); } return mapper; } } public static class CopyFilesReducer extends MapReduceBase implements Reducer { public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // nothing } } private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException { ArrayList uris = new ArrayList(); BufferedReader fis = null; String srcListURIScheme = srcListURI.getScheme(); String srcListURIPath = srcListURI.getPath(); if("file".equals(srcListURIScheme)) { fis = new BufferedReader(new FileReader(srcListURIPath)); } else if("dfs".equals(srcListURIScheme)) { FileSystem fs = FileSystem.getNamed(CopyFilesMapper.getFileSysName(srcListURI), conf); fis = new BufferedReader( new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf)) ); } else if("http".equals(srcListURIScheme)) { //Copy the file URL url = srcListURI.toURL(); HttpURLConnection connection = (HttpURLConnection)url.openConnection(); connection.setRequestMethod("GET"); connection.connect(); fis = new BufferedReader( new InputStreamReader(connection.getInputStream()) ); } else { throw new IOException("Unsupported source list uri!"); } String uri = null; while((uri = fis.readLine()) != null) { if(!uri.startsWith("#")) { uris.add(uri); } } fis.close(); if(!uris.isEmpty()) { return (String[])uris.toArray(new String[0]); } return null; } /** * Helper function to parse input file and return source urls for * a given protocol. * @param protocol : The protocol for which to find source urls. * @param inputFilePath : The file containing the urls. * @return */ private static String[] parseInputFile(String protocol, String[] uris) throws IOException { ArrayList protocolURIs = new ArrayList(); for(int i=0; i < uris.length; ++i) { if(uris[i].startsWith(protocol)) { protocolURIs.add(uris[i]); } } if(!protocolURIs.isEmpty()) { return (String[])protocolURIs.toArray(new String[0]); } return null; } /** * Driver to copy srcPath to destPath depending on required protocol. * @param conf : Configuration * @param srcPath : Source path * @param destPath : Destination path */ public static void copy(Configuration conf, String srcPath, String destPath, boolean srcAsList, boolean ignoreReadFailures) throws IOException { //Job configuration JobConf jobConf = new JobConf(conf, CopyFiles.class); jobConf.setJobName("distcp"); //Sanity check for srcPath/destPath URI srcURI = null; try { srcURI = new URI(srcPath); } catch (URISyntaxException ex) { throw new IOException("Illegal source path!"); } URI destURI = null; try { destURI = new URI(destPath); } catch (URISyntaxException ex) { throw new IOException("Illegal destination path!"); } //Source paths String[] srcPaths = null; if(srcAsList) { srcPaths = fetchSrcURIs(conf, srcURI); } //Create the task-specific mapper CopyFilesMapper mapper = null; if(srcAsList) { //Ugly?! // Protocol - 'dfs://' String[] dfsUrls = parseInputFile("dfs", srcPaths); if(dfsUrls != null) { for(int i=0; i < dfsUrls.length; ++i) { copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures); } } // Protocol - 'file://' String[] localUrls = parseInputFile("file", srcPaths); if(localUrls != null) { for(int i=0; i < localUrls.length; ++i) { copy(conf, localUrls[i], destPath, false, ignoreReadFailures); } } // Protocol - 'http://' String[] httpUrls = parseInputFile("http", srcPaths); if(httpUrls != null) { srcPaths = httpUrls; mapper = CopyMapperFactory.getMapper("http"); } else { //Done return; } } else { //Single source - ugly! String[] tmpSrcPath = {srcPath}; srcPaths = tmpSrcPath; mapper = CopyMapperFactory.getMapper(srcURI.getScheme()); } //Initialize the mapper mapper.setup(conf, jobConf, srcPaths, destPath, ignoreReadFailures); //We are good to go! try { JobClient.runJob(jobConf); } finally { mapper.cleanup(conf, jobConf, srcPath, destPath); } } /** * This is the main driver for recursively copying directories * across file systems. It takes at least two cmdline parameters. A source * URL and a destination URL. It then essentially does an "ls -lR" on the * source URL, and writes the output in aa round-robin manner to all the map * input files. The mapper actually copies the files allotted to it. And * the reduce is empty. */ public int run(String[] args) throws Exception { String srcPath = null; String destPath = null; boolean ignoreReadFailures = false; boolean srcAsList = false; for (int idx = 0; idx < args.length; idx++) { if ("-i".equals(args[idx])) { ignoreReadFailures = true; } else if ("-f".equals(args[idx])) { srcAsList = true; } else if (srcPath == null) { srcPath = args[idx]; } else if (destPath == null) { destPath = args[idx]; } else { System.out.println(usage); return -1; } } // mandatory command-line parameters if (srcPath == null || destPath == null) { System.out.println(usage); return -1; } try { copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures); } catch (Exception e) { System.out.println("Caught: " + e); return -1; } return 0; } public static void main(String[] args) throws Exception { int res = new CopyFiles().doMain( new JobConf(new Configuration(), CopyFiles.class), args); System.exit(res); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -