📄 copyfiles.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.util;import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.io.File;import java.io.InputStreamReader;import java.net.HttpURLConnection;import java.net.URI;import java.net.URL;import java.net.URISyntaxException;import java.text.DecimalFormat;import java.util.ArrayList;import java.util.Collections;import java.util.Enumeration;import java.util.Iterator;import java.util.Random;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.ClusterStatus;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileInputFormat;import org.apache.hadoop.mapred.SequenceFileOutputFormat;/** * A Map-reduce program to recursively copy directories between * diffferent file-systems. * * @author Milind Bhandarkar */public class CopyFiles extends ToolBase { private static final String usage = "distcp "+ "[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " + "[-conf <config-file.xml>] " + "[-D <property=value>] "+ "[-i] <srcurl> | -f <urilist_uri> <desturl>"; private static final long MIN_BYTES_PER_MAP = 1L << 28; private static final int MAX_NUM_MAPS = 10000; private static final int MAX_MAPS_PER_NODE = 10; private static final String readFailuresAttribute = "distcp.ignore.read.failures"; public void setConf(Configuration conf) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; } else { this.conf = new JobConf(conf); } } /** * Base-class for all mappers for distcp * @author Arun C Murthy */ public static abstract class CopyFilesMapper extends MapReduceBase { /** * Interface to initialize *distcp* specific map tasks. * @param conf : The dfs/mapred configuration. * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPaths : The source paths. * @param destPath : The destination path. * @param ignoreReadFailures : Ignore read failures? * @throws IOException */ public abstract void setup(Configuration conf, JobConf jobConf, String[] srcPaths, String destPath, boolean ignoreReadFailures) throws IOException; /** * Interface to cleanup *distcp* specific resources * @param conf : The dfs/mapred configuration. * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPath : The source uri. * @param destPath : The destination uri. * @throws IOException */ public abstract void cleanup(Configuration conf, JobConf jobConf, String srcPath, String destPath) throws IOException; public static String getFileSysName(URI url) { String fsname = url.getScheme(); if ("dfs".equals(fsname)) { String host = url.getHost(); int port = url.getPort(); return (port==(-1)) ? host : (host+":"+port); } else { return "local"; } } /** * Make a path relative with respect to a root path. * absPath is always assumed to descend from root. * Otherwise returned path is null. */ public static Path makeRelative(Path root, Path absPath) { if (!absPath.isAbsolute()) { return absPath; } String sRoot = root.toString(); String sPath = absPath.toString(); Enumeration rootTokens = new StringTokenizer(sRoot, "/"); ArrayList rList = Collections.list(rootTokens); Enumeration pathTokens = new StringTokenizer(sPath, "/"); ArrayList pList = Collections.list(pathTokens); Iterator rIter = rList.iterator(); Iterator pIter = pList.iterator(); while (rIter.hasNext()) { String rElem = (String) rIter.next(); String pElem = (String) pIter.next(); if (!rElem.equals(pElem)) { return null; } } StringBuffer sb = new StringBuffer(); while (pIter.hasNext()) { String pElem = (String) pIter.next(); sb.append(pElem); if (pIter.hasNext()) { sb.append("/"); } } return new Path(sb.toString()); } } /** * DFSCopyFilesMapper: The mapper for copying files from the DFS. * @author Milind Bhandarkar */ public static class DFSCopyFilesMapper extends CopyFilesMapper implements Mapper { private int sizeBuf = 4096; private FileSystem srcFileSys = null; private FileSystem destFileSys = null; private Path srcPath = null; private Path destPath = null; private byte[] buffer = null; private static final long reportInterval = 1L << 25; private long bytesSinceLastReport = 0L; private long totalBytesCopied = 0L; private static DecimalFormat percentFormat = new DecimalFormat("0.00"); private boolean ignoreReadFailures; private void copy(String src, Reporter reporter) throws IOException { // open source file Path srcFile = new Path(srcPath, src); FSDataInputStream in = srcFileSys.open(srcFile); long totalBytes = srcFileSys.getLength(srcFile); // create directories to hold destination file and create destFile Path destFile = new Path(destPath, src); Path destParent = destFile.getParent(); if (destParent != null) { destFileSys.mkdirs(destParent); } FSDataOutputStream out = destFileSys.create(destFile); // copy file while (true) { int nread = in.read(buffer); if (nread < 0) { break; } out.write(buffer, 0, nread); bytesSinceLastReport += nread; if (bytesSinceLastReport > reportInterval) { totalBytesCopied += bytesSinceLastReport; bytesSinceLastReport = 0L; reporter.setStatus("Copy "+ src + ": " + percentFormat.format(100.0 * totalBytesCopied / totalBytes) + "% and " + StringUtils.humanReadableInt(totalBytesCopied) + " bytes"); } } in.close(); out.close(); // report at least once for each file totalBytesCopied += bytesSinceLastReport; bytesSinceLastReport = 0L; reporter.setStatus("Finished. Bytes copied: " + StringUtils.humanReadableInt(totalBytesCopied)); } /** * Initialize DFSCopyFileMapper specific job-configuration. * @param conf : The dfs/mapred configuration. * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPaths : The source URIs. * @param destPath : The destination URI. * @param ignoreReadFailures : Ignore read failures? */ public void setup(Configuration conf, JobConf jobConf, String[] srcPaths, String destPath, boolean ignoreReadFailures) throws IOException { URI srcURI = null; URI destURI = null; try { srcURI = new URI(srcPaths[0]); destURI = new URI(destPath); } catch (URISyntaxException ex) { throw new RuntimeException("URL syntax error.", ex); } String srcFileSysName = getFileSysName(srcURI); String destFileSysName = getFileSysName(destURI); jobConf.set("copy.src.fs", srcFileSysName); jobConf.set("copy.dest.fs", destFileSysName); FileSystem srcfs = FileSystem.getNamed(srcFileSysName, conf); String srcPath = srcURI.getPath(); if ("".equals(srcPath)) { srcPath = "/"; } destPath = destURI.getPath(); if ("".equals(destPath)) { destPath = "/"; } Path tmpPath = new Path(srcPath); Path rootPath = new Path(srcPath); if (srcfs.isFile(tmpPath)) { tmpPath = tmpPath.getParent(); rootPath = rootPath.getParent(); jobConf.set("copy.src.path", tmpPath.toString()); } else { jobConf.set("copy.src.path", srcPath); } jobConf.set("copy.dest.path", destPath); if (!srcfs.exists(tmpPath)) { System.out.println(srcPath+" does not exist."); return; } // turn off speculative execution, because DFS doesn't handle // multiple writers to the same file. jobConf.setSpeculativeExecution(false); jobConf.setInputFormat(SequenceFileInputFormat.class); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(Text.class); jobConf.setOutputFormat(SequenceFileOutputFormat.class); jobConf.setMapperClass(DFSCopyFilesMapper.class); jobConf.setReducerClass(CopyFilesReducer.class); jobConf.setNumReduceTasks(1); jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures); Random r = new Random(); Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path inDir = new Path(jobDirectory, "in"); Path fakeOutDir = new Path(jobDirectory, "out"); FileSystem fileSys = FileSystem.get(jobConf); fileSys.mkdirs(inDir); jobConf.set("distcp.job.dir", jobDirectory.toString()); jobConf.setInputPath(inDir); jobConf.setOutputPath(fakeOutDir); // create new sequence-files for holding paths ArrayList pathList = new ArrayList(); ArrayList finalPathList = new ArrayList(); pathList.add(new Path(srcPath)); long totalBytes = 0; //int part = 0; while(!pathList.isEmpty()) { Path top = (Path) pathList.remove(0); if (srcfs.isFile(top)) { totalBytes += srcfs.getLength(top); top = makeRelative(rootPath, top); finalPathList.add(top.toString()); } else { Path[] paths = srcfs.listPaths(top); for (int idx = 0; idx < paths.length; idx++) { pathList.add(paths[idx]); } } } // ideal number of maps is one per file (if the map-launching overhead // were 0. It is limited by jobtrackers handling capacity, which lets say // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for // small files it is better to determine number of maps by amount of // data per map. int nFiles = finalPathList.size(); int numMaps = nFiles; if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; } if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) { numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP); } JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE; if (numMaps > tmpMaps) { numMaps = tmpMaps; } if (numMaps == 0) { numMaps = 1; } jobConf.setNumMapTasks(numMaps); for(int idx=0; idx < numMaps; ++idx) { Path file = new Path(inDir, "part"+idx); SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file, Text.class, Text.class); for (int ipath = idx; ipath < nFiles; ipath += numMaps) { String path = (String) finalPathList.get(ipath); writer.append(new Text(path), new Text("")); } writer.close(); } finalPathList = null; } public void cleanup(Configuration conf, JobConf jobConf, String srcPath, String destPath) throws IOException { //Clean up jobDirectory Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/")); FileSystem fs = FileSystem.get(jobConf); if(!jobDirectory.equals("/")) { fs.delete(jobDirectory); } } /** Mapper configuration. * Extracts source and destination file system, as well as * top-level paths on source and destination directories. * Gets the named file systems, to be used later in map. */ public void configure(JobConf job) { String srcfs = job.get("copy.src.fs", "local"); String destfs = job.get("copy.dest.fs", "local"); srcPath = new Path(job.get("copy.src.path", "/")); destPath = new Path(job.get("copy.dest.path", "/")); try { srcFileSys = FileSystem.getNamed(srcfs, job); destFileSys = FileSystem.getNamed(destfs, job); } catch (IOException ex) { throw new RuntimeException("Unable to get the named file system.", ex); } sizeBuf = job.getInt("copy.buf.size", 4096); buffer = new byte[sizeBuf]; ignoreReadFailures = job.getBoolean(readFailuresAttribute, false); } /** Map method. Copies one file from source file system to destination. * @param key source file name * @param value not-used. * @param out not-used. * @param reporter */ public void map(WritableComparable key, Writable value, OutputCollector out, Reporter reporter) throws IOException { String src = ((Text) key).toString(); try { copy(src, reporter); } catch (IOException except) { if (ignoreReadFailures) { reporter.setStatus("Failed to copy " + src + " : " + StringUtils.stringifyException(except)); try { destFileSys.delete(new Path(destPath, src)); } catch (Throwable ex) { // ignore, we are just cleaning up } } else { throw except; } } } public void close() { // nothing }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -