⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reducetaskrunner.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);          }        }      } catch (InterruptedException ie) {}          }  }  public ReduceTaskRunner(Task task, TaskTracker tracker,                           JobConf conf) throws IOException {    super(task, tracker, conf);    this.mapOutputFile = new MapOutputFile();    this.mapOutputFile.setConf(conf);    localFileSys = FileSystem.getNamed("local", conf);    this.reduceTask = (ReduceTask)getTask();    this.scheduledCopies = new ArrayList(100);    this.copyResults = new ArrayList(100);        this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);    this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);    // hosts -> next contact time    this.penaltyBox = new Hashtable();        // hostnames    this.uniqueHosts = new HashSet();        this.lastPollTime = 0;  }  /** Assemble all of the map output files */  public boolean prepare() throws IOException {        // cleanup from failures    this.mapOutputFile.removeAll(reduceTask.getTaskId());        final int      numOutputs = reduceTask.getNumMaps();    List           neededOutputs = new ArrayList(numOutputs);    List           knownOutputs = new ArrayList(100);    int            numInFlight = 0, numCopied = 0;    int            lowThreshold = numCopiers*2;    long           bytesTransferred = 0;    DecimalFormat  mbpsFormat = new DecimalFormat("0.00");    Random         backoff = new Random();    final Progress copyPhase = getTask().getProgress().phase();    MapCopyLeaseChecker leaseChecker = null;        for (int i = 0; i < numOutputs; i++) {      neededOutputs.add(new Integer(i));      copyPhase.addPhase();       // add sub-phase per file    }    InterTrackerProtocol jobClient = getTracker().getJobClient();    copiers = new MapOutputCopier[numCopiers];    // start all the copying threads    for (int i=0; i < copiers.length; i++) {      copiers[i] = new MapOutputCopier();      copiers[i].start();    }    leaseChecker = new MapCopyLeaseChecker();    leaseChecker.start();        // start the clock for bandwidth measurement    long startTime = System.currentTimeMillis();    long currentTime = startTime;        // loop until we get all required outputs or are killed    while (!killed && numCopied < numOutputs) {      LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +               " map output(s)");      if (!neededOutputs.isEmpty()) {        LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +                 " map output location(s)");        try {          MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);                    // remove discovered outputs from needed list          // and put them on the known list          for (int i=0; i < locs.length; i++) {            neededOutputs.remove(new Integer(locs[i].getMapId()));            knownOutputs.add(locs[i]);          }          LOG.info(reduceTask.getTaskId() +                   " Got " + (locs == null ? 0 : locs.length) +                    " map outputs from jobtracker");        }        catch (IOException ie) {          LOG.warn(reduceTask.getTaskId() +                      " Problem locating map outputs: " +                      StringUtils.stringifyException(ie));        }      }            // now walk through the cache and schedule what we can      int numKnown = knownOutputs.size(), numScheduled = 0;      int numSlow = 0, numDups = 0;      LOG.info(reduceTask.getTaskId() + " Got " + numKnown +                " known map output location(s); scheduling...");      synchronized (scheduledCopies) {        ListIterator locIt = knownOutputs.listIterator();        currentTime = System.currentTimeMillis();        while (locIt.hasNext()) {          MapOutputLocation loc = (MapOutputLocation)locIt.next();          Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());          boolean penalized = false, duplicate = false;           if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {            penalized = true; numSlow++;          }          if (uniqueHosts.contains(loc.getHost())) {            duplicate = true; numDups++;          }           if (!penalized && !duplicate) {            uniqueHosts.add(loc.getHost());            scheduledCopies.add(loc);            locIt.remove();  // remove from knownOutputs            numInFlight++; numScheduled++;          }        }        scheduledCopies.notifyAll();      }      LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +               " of " + numKnown + " known outputs (" + numSlow +               " slow hosts and " + numDups + " dup hosts)");      // if we have no copies in flight and we can't schedule anything      // new, just wait for a bit      try {        if (numInFlight == 0 && numScheduled == 0) {          getTask().reportProgress(getTracker());          Thread.sleep(5000);        }      } catch (InterruptedException e) { } // IGNORE      while (!killed && numInFlight > 0) {        CopyResult cr = getCopyResult();                if (cr != null) {          if (cr.getSuccess()) {  // a successful copy            numCopied++;            bytesTransferred += cr.getSize();                      long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;            float mbs = ((float)bytesTransferred)/(1024*1024);            float transferRate = mbs/secsSinceStart;                      copyPhase.startNextPhase();            copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +                                 " at " +                                mbpsFormat.format(transferRate) +  " MB/s)");                      getTask().reportProgress(getTracker());          } else {            // this copy failed, put it back onto neededOutputs            neededOutputs.add(new Integer(cr.getMapId()));                      // wait a random amount of time for next contact            currentTime = System.currentTimeMillis();            long nextContact = currentTime + 60 * 1000 +                               backoff.nextInt(maxBackoff*1000);            penaltyBox.put(cr.getHost(), new Long(nextContact));                      LOG.warn(reduceTask.getTaskId() + " adding host " +                     cr.getHost() + " to penalty box, next contact in " +                     ((nextContact-currentTime)/1000) + " seconds");            // other outputs from the failed host may be present in the            // knownOutputs cache, purge them. This is important in case            // the failure is due to a lost tasktracker (causes many            // unnecessary backoffs). If not, we only take a small hit            // polling the jobtracker a few more times            ListIterator locIt = knownOutputs.listIterator();            while (locIt.hasNext()) {              MapOutputLocation loc = (MapOutputLocation)locIt.next();              if (cr.getHost().equals(loc.getHost())) {                locIt.remove();                neededOutputs.add(new Integer(loc.getMapId()));              }            }          }          uniqueHosts.remove(cr.getHost());          numInFlight--;        }                // ensure we have enough to keep us busy        if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {          break;        }      }          }    // all done, inform the copiers to exit    leaseChecker.interrupt();    synchronized (copiers) {      synchronized (scheduledCopies) {        for (int i=0; i < copiers.length; i++) {          copiers[i].interrupt();          copiers[i] = null;        }      }    }        return numCopied == numOutputs && !killed;  }      private CopyResult getCopyResult() {     synchronized (copyResults) {      while (!killed && copyResults.isEmpty()) {        try {          copyResults.wait();        } catch (InterruptedException e) { }      }      if (copyResults.isEmpty()) {        return null;      } else {        return (CopyResult) copyResults.remove(0);      }    }      }    /** Queries the job tracker for a set of outputs ready to be copied   * @param neededOutputs the list of currently unknown outputs   * @param jobClient the job tracker   * @return a set of locations to copy outputs from   * @throws IOException   */    private MapOutputLocation[] queryJobTracker(List neededOutputs,                                              InterTrackerProtocol jobClient)  throws IOException {        // query for a just a random subset of needed segments so that we don't    // overwhelm jobtracker.  ideally perhaps we could send a more compact    // representation of all needed, i.e., a bit-vector    int     checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());    int     neededIds[] = new int[checkSize];          Collections.shuffle(neededOutputs);          ListIterator itr = neededOutputs.listIterator();    for (int i=0; i < checkSize; i++) {      neededIds[i] = ((Integer)itr.next()).intValue();    }    long currentTime = System.currentTimeMillis();        long pollTime = lastPollTime + MIN_POLL_INTERVAL;    while (currentTime < pollTime) {      try {        Thread.sleep(pollTime-currentTime);      } catch (InterruptedException ie) { } // IGNORE      currentTime = System.currentTimeMillis();    }    lastPollTime = currentTime;    return jobClient.locateMapOutputs(reduceTask.getJobId().toString(),                                       neededIds,                                      reduceTask.getPartition());  }    /** Delete all of the temporary map output files. */  public void close() throws IOException {    getTask().getProgress().setStatus("closed");    this.mapOutputFile.removeAll(getTask().getTaskId());  }  /**   * Kill the child process, but also kick getCopyResult so that it checks   * the kill flag.   */  public void kill() {    synchronized (copyResults) {      super.kill();      copyResults.notify();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -