inputformatbase.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 155 行
JAVA
155 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import java.io.IOException;import java.io.File;import java.util.ArrayList;import java.util.logging.Logger;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.util.LogFormatter;/** A base class for {@link InputFormat}. */public abstract class InputFormatBase implements InputFormat { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.InputFormatBase"); private static final double SPLIT_SLOP = 0.1; // 10% slop private long minSplitSize = 1; protected void setMinSplitSize(long minSplitSize) { this.minSplitSize = minSplitSize; } public abstract RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException; /** List input directories. * Subclasses may override to, e.g., select only files matching a regular * expression. * Property mapred.input.subdir, if set, names a subdirectory that * is appended to all input dirs specified by job, and if the given fs * lists those too, each is added to the returned array of File. * @param fs * @param job * @return array of File objects, never zero length. * @throws IOException if zero items. */ protected File[] listFiles(FileSystem fs, JobConf job) throws IOException { File[] dirs = job.getInputDirs(); String workDir = job.getWorkingDirectory(); String subdir = job.get("mapred.input.subdir"); ArrayList result = new ArrayList(); for (int i = 0; i < dirs.length; i++) { // if it is relative, make it absolute using the directory from the // JobConf if (workDir != null && !fs.isAbsolute(dirs[i])) { dirs[i] = new File(workDir, dirs[i].toString()); } File[] dir = fs.listFiles(dirs[i]); if (dir != null) { for (int j = 0; j < dir.length; j++) { File file = dir[j]; if (subdir != null) { File[] subFiles = fs.listFiles(new File(file, subdir)); if (subFiles != null) { for (int k = 0; k < subFiles.length; k++) { result.add(subFiles[k]); } } } else { result.add(file); } } } } if (result.size() == 0) { throw new IOException("No input directories specified in: "+job); } return (File[])result.toArray(new File[result.size()]); } /** Splits files returned by {#listFiles(FileSystem,JobConf) when * they're too big.*/ public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException { File[] files = listFiles(fs, job); for (int i = 0; i < files.length; i++) { // check we have valid files File file = files[i]; if (fs.isDirectory(file) || !fs.exists(file)) { throw new IOException("Not a file: "+files[i]); } } long totalSize = 0; // compute total size for (int i = 0; i < files.length; i++) { totalSize += fs.getLength(files[i]); } long bytesPerSplit = totalSize / numSplits; // start w/ desired num splits long fsBlockSize = fs.getBlockSize(); if (bytesPerSplit > fsBlockSize) { // no larger than fs blocks bytesPerSplit = fsBlockSize; } long configuredMinSplitSize = job.getLong("mapred.min.split.size", 0); if( configuredMinSplitSize < minSplitSize ) configuredMinSplitSize = minSplitSize; if (bytesPerSplit < configuredMinSplitSize) { // no smaller than min size bytesPerSplit = configuredMinSplitSize; } long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP); //LOG.info("bytesPerSplit = " + bytesPerSplit); //LOG.info("maxPerSplit = " + maxPerSplit); ArrayList splits = new ArrayList(numSplits); // generate splits for (int i = 0; i < files.length; i++) { File file = files[i]; long length = fs.getLength(file); long bytesRemaining = length; while (bytesRemaining >= maxPerSplit) { splits.add(new FileSplit(file, length-bytesRemaining, bytesPerSplit)); bytesRemaining -= bytesPerSplit; } if (bytesRemaining != 0) { splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining)); } //LOG.info( "Generating splits for " + i + "th file: " + file.getName() ); } //LOG.info( "Total # of splits: " + splits.size() ); return (FileSplit[])splits.toArray(new FileSplit[splits.size()]); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?