📄 reducetaskrunner.java
字号:
Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime); } } } catch (InterruptedException ie) {} } } public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) throws IOException { super(task, tracker, conf); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); localFileSys = FileSystem.getNamed("local", conf); this.reduceTask = (ReduceTask)getTask(); this.scheduledCopies = new ArrayList(100); this.copyResults = new ArrayList(100); this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300); // hosts -> next contact time this.penaltyBox = new Hashtable(); // hostnames this.uniqueHosts = new HashSet(); this.lastPollTime = 0; } /** Assemble all of the map output files */ public boolean prepare() throws IOException { // cleanup from failures this.mapOutputFile.removeAll(reduceTask.getTaskId()); final int numOutputs = reduceTask.getNumMaps(); List neededOutputs = new ArrayList(numOutputs); List knownOutputs = new ArrayList(100); int numInFlight = 0, numCopied = 0; int lowThreshold = numCopiers*2; long bytesTransferred = 0; DecimalFormat mbpsFormat = new DecimalFormat("0.00"); Random backoff = new Random(); final Progress copyPhase = getTask().getProgress().phase(); MapCopyLeaseChecker leaseChecker = null; for (int i = 0; i < numOutputs; i++) { neededOutputs.add(new Integer(i)); copyPhase.addPhase(); // add sub-phase per file } InterTrackerProtocol jobClient = getTracker().getJobClient(); copiers = new MapOutputCopier[numCopiers]; // start all the copying threads for (int i=0; i < copiers.length; i++) { copiers[i] = new MapOutputCopier(); copiers[i].start(); } leaseChecker = new MapCopyLeaseChecker(); leaseChecker.start(); // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; // loop until we get all required outputs or are killed while (!killed && numCopied < numOutputs) { LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) + " map output(s)"); if (!neededOutputs.isEmpty()) { LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() + " map output location(s)"); try { MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient); // remove discovered outputs from needed list // and put them on the known list for (int i=0; i < locs.length; i++) { neededOutputs.remove(new Integer(locs[i].getMapId())); knownOutputs.add(locs[i]); } LOG.info(reduceTask.getTaskId() + " Got " + (locs == null ? 0 : locs.length) + " map outputs from jobtracker"); } catch (IOException ie) { LOG.warn(reduceTask.getTaskId() + " Problem locating map outputs: " + StringUtils.stringifyException(ie)); } } // now walk through the cache and schedule what we can int numKnown = knownOutputs.size(), numScheduled = 0; int numSlow = 0, numDups = 0; LOG.info(reduceTask.getTaskId() + " Got " + numKnown + " known map output location(s); scheduling..."); synchronized (scheduledCopies) { ListIterator locIt = knownOutputs.listIterator(); currentTime = System.currentTimeMillis(); while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); Long penaltyEnd = (Long)penaltyBox.get(loc.getHost()); boolean penalized = false, duplicate = false; if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) { penalized = true; numSlow++; } if (uniqueHosts.contains(loc.getHost())) { duplicate = true; numDups++; } if (!penalized && !duplicate) { uniqueHosts.add(loc.getHost()); scheduledCopies.add(loc); locIt.remove(); // remove from knownOutputs numInFlight++; numScheduled++; } } scheduledCopies.notifyAll(); } LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled + " of " + numKnown + " known outputs (" + numSlow + " slow hosts and " + numDups + " dup hosts)"); // if we have no copies in flight and we can't schedule anything // new, just wait for a bit try { if (numInFlight == 0 && numScheduled == 0) { getTask().reportProgress(getTracker()); Thread.sleep(5000); } } catch (InterruptedException e) { } // IGNORE while (!killed && numInFlight > 0) { CopyResult cr = getCopyResult(); if (cr != null) { if (cr.getSuccess()) { // a successful copy numCopied++; bytesTransferred += cr.getSize(); long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1; float mbs = ((float)bytesTransferred)/(1024*1024); float transferRate = mbs/secsSinceStart; copyPhase.startNextPhase(); copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " + mbpsFormat.format(transferRate) + " MB/s)"); getTask().reportProgress(getTracker()); } else { // this copy failed, put it back onto neededOutputs neededOutputs.add(new Integer(cr.getMapId())); // wait a random amount of time for next contact currentTime = System.currentTimeMillis(); long nextContact = currentTime + 60 * 1000 + backoff.nextInt(maxBackoff*1000); penaltyBox.put(cr.getHost(), new Long(nextContact)); LOG.warn(reduceTask.getTaskId() + " adding host " + cr.getHost() + " to penalty box, next contact in " + ((nextContact-currentTime)/1000) + " seconds"); // other outputs from the failed host may be present in the // knownOutputs cache, purge them. This is important in case // the failure is due to a lost tasktracker (causes many // unnecessary backoffs). If not, we only take a small hit // polling the jobtracker a few more times ListIterator locIt = knownOutputs.listIterator(); while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); if (cr.getHost().equals(loc.getHost())) { locIt.remove(); neededOutputs.add(new Integer(loc.getMapId())); } } } uniqueHosts.remove(cr.getHost()); numInFlight--; } // ensure we have enough to keep us busy if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) { break; } } } // all done, inform the copiers to exit leaseChecker.interrupt(); synchronized (copiers) { synchronized (scheduledCopies) { for (int i=0; i < copiers.length; i++) { copiers[i].interrupt(); copiers[i] = null; } } } return numCopied == numOutputs && !killed; } private CopyResult getCopyResult() { synchronized (copyResults) { while (!killed && copyResults.isEmpty()) { try { copyResults.wait(); } catch (InterruptedException e) { } } if (copyResults.isEmpty()) { return null; } else { return (CopyResult) copyResults.remove(0); } } } /** Queries the job tracker for a set of outputs ready to be copied * @param neededOutputs the list of currently unknown outputs * @param jobClient the job tracker * @return a set of locations to copy outputs from * @throws IOException */ private MapOutputLocation[] queryJobTracker(List neededOutputs, InterTrackerProtocol jobClient) throws IOException { // query for a just a random subset of needed segments so that we don't // overwhelm jobtracker. ideally perhaps we could send a more compact // representation of all needed, i.e., a bit-vector int checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size()); int neededIds[] = new int[checkSize]; Collections.shuffle(neededOutputs); ListIterator itr = neededOutputs.listIterator(); for (int i=0; i < checkSize; i++) { neededIds[i] = ((Integer)itr.next()).intValue(); } long currentTime = System.currentTimeMillis(); long pollTime = lastPollTime + MIN_POLL_INTERVAL; while (currentTime < pollTime) { try { Thread.sleep(pollTime-currentTime); } catch (InterruptedException ie) { } // IGNORE currentTime = System.currentTimeMillis(); } lastPollTime = currentTime; return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), neededIds, reduceTask.getPartition()); } /** Delete all of the temporary map output files. */ public void close() throws IOException { getTask().getProgress().setStatus("closed"); this.mapOutputFile.removeAll(getTask().getTaskId()); } /** * Kill the child process, but also kick getCopyResult so that it checks * the kill flag. */ public void kill() { synchronized (copyResults) { super.kill(); copyResults.notify(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -