📄 reducetaskrunner.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.util.*;import java.io.*;import java.util.*;import java.text.DecimalFormat;import org.apache.hadoop.util.Progressable;/** Runs a reduce task. */class ReduceTaskRunner extends TaskRunner { /** * for cleaning up old map outputs */ private MapOutputFile mapOutputFile; /** * our reduce task instance */ private ReduceTask reduceTask; /** * the list of map outputs currently being copied */ private List scheduledCopies; /** * the results of dispatched copy attempts */ private List copyResults; /** * the number of outputs to copy in parallel */ private int numCopiers; /** * the maximum amount of time (less 1 minute) to wait to * contact a host after a copy from it fails. We wait for (1 min + * Random.nextInt(maxBackoff)) seconds. */ private int maxBackoff; /** * busy hosts from which copies are being backed off * Map of host -> next contact time */ private Map penaltyBox; /** * the set of unique hosts from which we are copying */ private Set uniqueHosts; /** * the last time we polled the job tracker */ private long lastPollTime; /** * A reference to the local file system for writing the map outputs to. */ private FileSystem localFileSys; /** * The threads for fetching the files. */ private MapOutputCopier[] copiers = null; /** * the minimum interval between jobtracker polls */ private static final long MIN_POLL_INTERVAL = 5000; /** * the number of map output locations to poll for at one time */ private static final int PROBE_SAMPLE_SIZE = 50; /** Represents the result of an attempt to copy a map output */ private class CopyResult { // the map output location against which a copy attempt was made private final MapOutputLocation loc; // the size of the file copied, -1 if the transfer failed private final long size; CopyResult(MapOutputLocation loc, long size) { this.loc = loc; this.size = size; } public int getMapId() { return loc.getMapId(); } public boolean getSuccess() { return size >= 0; } public long getSize() { return size; } public String getHost() { return loc.getHost(); } public MapOutputLocation getLocation() { return loc; } } private class PingTimer implements Progressable { private long pingTime; public synchronized void reset() { pingTime = 0; } public synchronized long getLastPing() { return pingTime; } public void progress() { synchronized (this) { pingTime = System.currentTimeMillis(); getTask().reportProgress(getTracker()); } } } /** Copies map outputs as they become available */ private class MapOutputCopier extends Thread { private PingTimer pingTimer = new PingTimer(); private MapOutputLocation currentLocation = null; public MapOutputCopier() { } /** * Get the last time that this copier made progress. * @return the System.currentTimeMillis when this copier last made progress */ public long getLastProgressTime() { return pingTimer.getLastPing(); } /** * Fail the current file that we are fetching * @return were we currently fetching? */ public synchronized boolean fail() { if (currentLocation != null) { finish(-1); return true; } else { return false; } } /** * Get the current map output location. */ public synchronized MapOutputLocation getLocation() { return currentLocation; } private synchronized void start(MapOutputLocation loc) { currentLocation = loc; } private synchronized void finish(long size) { if (currentLocation != null) { synchronized (copyResults) { copyResults.add(new CopyResult(currentLocation, size)); copyResults.notify(); } currentLocation = null; } } /** Loop forever and fetch map outputs as they become available. * The thread exits when it is interrupted by the {@link ReduceTaskRunner} */ public void run() { try { while (true) { MapOutputLocation loc = null; long size = -1; synchronized (scheduledCopies) { while (scheduledCopies.isEmpty()) { scheduledCopies.wait(); } loc = (MapOutputLocation)scheduledCopies.remove(0); } try { start(loc); pingTimer.progress(); size = copyOutput(loc, pingTimer); pingTimer.reset(); } catch (IOException e) { LOG.warn(reduceTask.getTaskId() + " copy failed: " + loc.getMapTaskId() + " from " + loc.getHost()); LOG.warn(StringUtils.stringifyException(e)); } finish(size); } } catch (InterruptedException e) { } // ALL DONE! } /** Copies a a map output from a remote host, using raw RPC. * @param currentLocation the map output location to be copied * @param pingee a status object to ping as we make progress * @return the size of the copied file * @throws IOException if there is an error copying the file */ private long copyOutput(MapOutputLocation loc, Progressable pingee) throws IOException { String reduceId = reduceTask.getTaskId(); LOG.info(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); try { // this copies the map output file Path filename = conf.getLocalPath(reduceId + "/map_" + loc.getMapId() + ".out"); long bytes = loc.getFile(localFileSys, filename, reduceTask.getPartition(), pingee); LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); return bytes; } catch (IOException e) { LOG.warn(reduceTask.getTaskId() + " failed to copy " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); throw e; } } } private class MapCopyLeaseChecker extends Thread { private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000; private static final long STALLED_COPY_CHECK = 60 * 1000; private long lastStalledCheck = 0; public void run() { try { while (true) { long currentTime = System.currentTimeMillis(); if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) { lastStalledCheck = currentTime; synchronized (copiers) { for(int i=0; i < copiers.length; ++i) { if (copiers[i] == null) { break; } long lastProgress = copiers[i].getLastProgressTime(); if (lastProgress != 0 && currentTime - lastProgress > STALLED_COPY_TIMEOUT) { LOG.warn("Map output copy stalled on " + copiers[i].getLocation()); // mark the current file as failed copiers[i].fail(); // tell the thread to stop copiers[i].interrupt(); // create a replacement thread copiers[i] = new MapOutputCopier(); copiers[i].start(); } } } } else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -