⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reducetaskrunner.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.util.*;import java.io.*;import java.util.*;import java.text.DecimalFormat;import org.apache.hadoop.util.Progressable;/** Runs a reduce task. */class ReduceTaskRunner extends TaskRunner {    /**    * for cleaning up old map outputs   */  private MapOutputFile mapOutputFile;    /**   * our reduce task instance   */  private ReduceTask reduceTask;      /**   * the list of map outputs currently being copied   */  private List scheduledCopies;    /**   *  the results of dispatched copy attempts   */  private List copyResults;    /**   *  the number of outputs to copy in parallel   */  private int numCopiers;  /**   * the maximum amount of time (less 1 minute) to wait to    * contact a host after a copy from it fails. We wait for (1 min +   * Random.nextInt(maxBackoff)) seconds.   */  private int maxBackoff;    /**   * busy hosts from which copies are being backed off   * Map of host -> next contact time   */  private Map penaltyBox;  /**   * the set of unique hosts from which we are copying   */  private Set uniqueHosts;    /**   * the last time we polled the job tracker   */  private long lastPollTime;    /**   * A reference to the local file system for writing the map outputs to.   */  private FileSystem localFileSys;  /**   * The threads for fetching the files.   */  private MapOutputCopier[] copiers = null;    /**   * the minimum interval between jobtracker polls   */  private static final long MIN_POLL_INTERVAL = 5000;    /**   * the number of map output locations to poll for at one time   */  private static final int PROBE_SAMPLE_SIZE = 50;  /** Represents the result of an attempt to copy a map output */  private class CopyResult {        // the map output location against which a copy attempt was made    private final MapOutputLocation loc;        // the size of the file copied, -1 if the transfer failed    private final long size;            CopyResult(MapOutputLocation loc, long size) {      this.loc = loc;      this.size = size;    }        public int getMapId() { return loc.getMapId(); }    public boolean getSuccess() { return size >= 0; }    public long getSize() { return size; }    public String getHost() { return loc.getHost(); }    public MapOutputLocation getLocation() { return loc; }  }  private class PingTimer implements Progressable {    private long pingTime;        public synchronized void reset() {      pingTime = 0;    }        public synchronized long getLastPing() {      return pingTime;    }        public void progress() {      synchronized (this) {        pingTime = System.currentTimeMillis();        getTask().reportProgress(getTracker());      }    }  }    /** Copies map outputs as they become available */  private class MapOutputCopier extends Thread {    private PingTimer pingTimer = new PingTimer();    private MapOutputLocation currentLocation = null;        public MapOutputCopier() {    }        /**     * Get the last time that this copier made progress.     * @return the System.currentTimeMillis when this copier last made progress     */    public long getLastProgressTime() {      return pingTimer.getLastPing();    }        /**     * Fail the current file that we are fetching     * @return were we currently fetching?     */    public synchronized boolean fail() {      if (currentLocation != null) {        finish(-1);        return true;      } else {        return false;      }    }        /**     * Get the current map output location.     */    public synchronized MapOutputLocation getLocation() {      return currentLocation;    }        private synchronized void start(MapOutputLocation loc) {      currentLocation = loc;    }        private synchronized void finish(long size) {      if (currentLocation != null) {        synchronized (copyResults) {          copyResults.add(new CopyResult(currentLocation, size));          copyResults.notify();        }        currentLocation = null;      }    }        /** Loop forever and fetch map outputs as they become available.     * The thread exits when it is interrupted by the {@link ReduceTaskRunner}     */    public void run() {      try {        while (true) {                  MapOutputLocation loc = null;          long size = -1;                    synchronized (scheduledCopies) {            while (scheduledCopies.isEmpty()) {              scheduledCopies.wait();            }            loc = (MapOutputLocation)scheduledCopies.remove(0);          }          try {            start(loc);            pingTimer.progress();            size = copyOutput(loc, pingTimer);            pingTimer.reset();          } catch (IOException e) {            LOG.warn(reduceTask.getTaskId() + " copy failed: " +                        loc.getMapTaskId() + " from " + loc.getHost());            LOG.warn(StringUtils.stringifyException(e));          }          finish(size);        }      } catch (InterruptedException e) { }  // ALL DONE!    }    /** Copies a a map output from a remote host, using raw RPC.      * @param currentLocation the map output location to be copied     * @param pingee a status object to ping as we make progress     * @return the size of the copied file     * @throws IOException if there is an error copying the file     */    private long copyOutput(MapOutputLocation loc,                             Progressable pingee)    throws IOException {      String reduceId = reduceTask.getTaskId();      LOG.info(reduceId + " Copying " + loc.getMapTaskId() +               " output from " + loc.getHost() + ".");      try {        // this copies the map output file        Path filename = conf.getLocalPath(reduceId + "/map_" +                                          loc.getMapId() + ".out");        long bytes = loc.getFile(localFileSys, filename,                                 reduceTask.getPartition(), pingee);        LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +                 " output from " + loc.getHost() + ".");        return bytes;      }      catch (IOException e) {        LOG.warn(reduceTask.getTaskId() + " failed to copy " + loc.getMapTaskId() +                    " output from " + loc.getHost() + ".");        throw e;      }    }  }    private class MapCopyLeaseChecker extends Thread {    private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000;    private static final long STALLED_COPY_CHECK = 60 * 1000;    private long lastStalledCheck = 0;        public void run() {      try {        while (true) {          long currentTime = System.currentTimeMillis();          if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {            lastStalledCheck = currentTime;            synchronized (copiers) {              for(int i=0; i < copiers.length; ++i) {                if (copiers[i] == null) {                  break;                }                long lastProgress = copiers[i].getLastProgressTime();                if (lastProgress != 0 &&                     currentTime - lastProgress > STALLED_COPY_TIMEOUT)  {                  LOG.warn("Map output copy stalled on " +                           copiers[i].getLocation());                  // mark the current file as failed                  copiers[i].fail();                  // tell the thread to stop                  copiers[i].interrupt();                  // create a replacement thread                  copiers[i] = new MapOutputCopier();                  copiers[i].start();                }              }            }          } else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -