📄 tasktracker.java
字号:
try { // This while-loop attempts reconnects if we get network errors while (running && ! staleState && !shuttingDown ) { try { if (offerService() == STALE_STATE) { staleState = true; } } catch (Exception ex) { if (!shuttingDown) { LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying...", ex); try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } } } finally { close(); } if (shuttingDown) { return; } LOG.info("Reinitializing local state"); initialize(); } } catch (IOException iex) { LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString()); return; } } /** * This class implements a queue that is put between producer and * consumer threads. It will grow without bound. * @author Owen O'Malley */ static private class BlockingQueue { private List queue; /** * Create an empty queue. */ public BlockingQueue() { queue = new ArrayList(); } /** * Put the given object at the back of the queue. * @param obj */ public void put(Object obj) { synchronized (queue) { queue.add(obj); queue.notify(); } } /** * Take the object at the front of the queue. * It blocks until there is an object available. * @return the head of the queue */ public Object take() { synchronized (queue) { while (queue.isEmpty()) { try { queue.wait(); } catch (InterruptedException ie) {} } Object result = queue.get(0); queue.remove(0); return result; } } } /////////////////////////////////////////////////////// // TaskInProgress maintains all the info for a Task that // lives at this TaskTracker. It maintains the Task object, // its TaskStatus, and the TaskRunner. /////////////////////////////////////////////////////// class TaskInProgress { Task task; float progress; TaskStatus.State runstate; long lastProgressReport; StringBuffer diagnosticInfo = new StringBuffer(); TaskRunner runner; boolean done = false; boolean wasKilled = false; private JobConf defaultJobConf; private JobConf localJobConf; private boolean keepFailedTaskFiles; private boolean alwaysKeepTaskFiles; private TaskStatus taskStatus ; private boolean keepJobFiles; /** */ public TaskInProgress(Task task, JobConf conf) { this.task = task; this.progress = 0.0f; this.runstate = TaskStatus.State.UNASSIGNED; this.lastProgressReport = System.currentTimeMillis(); this.defaultJobConf = conf; localJobConf = null; taskStatus = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), "initializing", getName(), task.isMapTask()?Phase.MAP:Phase.SHUFFLE); keepJobFiles = false; } private void localizeTask(Task task) throws IOException{ Path localTaskDir = new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR + JOBCACHE + Path.SEPARATOR + task.getJobId()), task.getTaskId()); FileSystem localFs = FileSystem.getNamed("local", fConf); localFs.mkdirs(localTaskDir); Path localTaskFile = new Path(localTaskDir, "job.xml"); task.setJobFile(localTaskFile.toString()); localJobConf.set("mapred.local.dir", fConf.get("mapred.local.dir")); localJobConf.set("mapred.task.id", task.getTaskId()); keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); task.localizeConfiguration(localJobConf); OutputStream out = localFs.create(localTaskFile); try { localJobConf.write(out); } finally { out.close(); } task.setConf(localJobConf); String keepPattern = localJobConf.getKeepTaskFilesPattern(); if (keepPattern != null) { keepJobFiles = true; alwaysKeepTaskFiles = Pattern.matches(keepPattern, task.getTaskId()); } else { alwaysKeepTaskFiles = false; } } /** */ public Task getTask() { return task; } public void setJobConf(JobConf lconf){ this.localJobConf = lconf; keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); } /** */ public synchronized TaskStatus createStatus() { taskStatus.setProgress(progress); taskStatus.setRunState(runstate); taskStatus.setDiagnosticInfo(diagnosticInfo.toString()); if (diagnosticInfo.length() > 0) { diagnosticInfo = new StringBuffer(); } return taskStatus; } /** * Kick off the task execution */ public synchronized void launchTask() throws IOException { localizeTask(task); this.runstate = TaskStatus.State.RUNNING; this.runner = task.createRunner(TaskTracker.this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis()); } /** * The task is reporting its progress */ public synchronized void reportProgress(float p, String state, Phase newPhase) { LOG.info(task.getTaskId()+" "+p+"% "+state); this.progress = p; this.runstate = TaskStatus.State.RUNNING; this.lastProgressReport = System.currentTimeMillis(); Phase oldPhase = taskStatus.getPhase() ; if( oldPhase != newPhase ){ // sort phase started if( newPhase == Phase.SORT ){ this.taskStatus.setShuffleFinishTime(System.currentTimeMillis()); }else if( newPhase == Phase.REDUCE){ this.taskStatus.setSortFinishTime(System.currentTimeMillis()); } } this.taskStatus.setStateString(state); } /** */ public long getLastProgressReport() { return lastProgressReport; } /** */ public TaskStatus.State getRunState() { return runstate; } /** * The task has reported some diagnostic info about its status */ public synchronized void reportDiagnosticInfo(String info) { this.diagnosticInfo.append(info); } /** * The task is reporting that it's done running */ public synchronized void reportDone() { LOG.info("Task " + task.getTaskId() + " is done."); this.progress = 1.0f; this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; } /** * The task has actually finished running. */ public void taskFinished() { long start = System.currentTimeMillis(); // // Wait until task reports as done. If it hasn't reported in, // wait for a second and try again. // while (! done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } // // Change state to success or failure, depending on whether // task was 'done' before terminating // boolean needCleanup = false; synchronized (this) { if (done) { runstate = TaskStatus.State.SUCCEEDED; } else { if (!wasKilled) { failures += 1; runstate = TaskStatus.State.FAILED; } else { runstate = TaskStatus.State.KILLED; } progress = 0.0f; } this.taskStatus.setFinishTime(System.currentTimeMillis()); needCleanup = (runstate == TaskStatus.State.FAILED) | (runstate == TaskStatus.State.KILLED); } // // If the task has failed, or if the task was killAndCleanup()'ed, // we should clean up right away. We only wait to cleanup // if the task succeeded, and its results might be useful // later on to downstream job processing. // if (needCleanup) { try { cleanup(); } catch (IOException ie) { } } } /** * We no longer need anything from this task, as the job has * finished. If the task is still running, kill it (and clean up */ public synchronized void jobHasFinished() throws IOException { if (getRunState() == TaskStatus.State.RUNNING) { killAndCleanup(false); } else { cleanup(); } if (keepJobFiles) return; // delete the job diretory for this task // since the job is done/failed this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR + task.getJobId()); } /** * Something went wrong and the task must be killed. * @param wasFailure was it a failure (versus a kill request)? */ public synchronized void killAndCleanup(boolean wasFailure ) throws IOException { if (runstate == TaskStatus.State.RUNNING) { wasKilled = true; if (wasFailure) { failures += 1; } runner.kill(); } else if (runstate == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; runstate = TaskStatus.State.FAILED; } else { runstate = TaskStatus.State.KILLED; } } } /** * The map output has been lost. */ public synchronized void mapOutputLost(String failure ) throws IOException { if (runstate == TaskStatus.State.SUCCEEDED) { LOG.info("Reporting output lost:"+task.getTaskId()); runstate = TaskStatus.State.FAILED; // change status to failure progress = 0.0f; reportDiagnosticInfo("Map output lost, rescheduling: " + failure); runningTasks.put(task.getTaskId(), this); mapTotal++; } else { LOG.warn("Output already reported lost:"+task.getTaskId()); } } /** * We no longer need anything from this task. Either the * controlling job is all done and the files have been copied * away, or the task failed and we don't need the remains. */ void cleanup() throws IOException { String taskId = task.getTaskId(); LOG.debug("Cleaning up " + taskId);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -