tasktracker.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 759 行 · 第 1/2 页

JAVA
759
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/******************************************************* * TaskTracker is a process that starts and tracks MR Tasks * in a networked environment.  It contacts the JobTracker * for Task assignments and reporting results. * * @author Mike Cafarella *******************************************************/public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {    static final long WAIT_FOR_DONE = 3 * 1000;    private long taskTimeout;     static final int STALE_STATE = 1;    public static final Logger LOG =    LogFormatter.getLogger("org.apache.hadoop.mapred.TaskTracker");    private boolean running = true;    String taskTrackerName;    String localHostname;    InetSocketAddress jobTrackAddr;    int taskReportPort;    int mapOutputPort;    Server taskReportServer = null;    Server mapOutputServer = null;    InterTrackerProtocol jobClient;    TreeMap tasks = null;    TreeMap runningTasks = null;    int mapTotal = 0;    int reduceTotal = 0;    boolean justStarted = true;    static Random r = new Random();    FileSystem fs = null;    static final String SUBDIR = "taskTracker";    private Configuration fConf;    private MapOutputFile mapOutputFile;    private int maxCurrentTasks;    class MapOutputServer extends RPC.Server {      private MapOutputServer(int port, int threads) {        super(TaskTracker.this, fConf, port, threads, false);      }      public TaskTracker getTaskTracker() {        return TaskTracker.this;      }    }    /**     * Start with the local machine name, and the default JobTracker     */    public TaskTracker(Configuration conf) throws IOException {      this(JobTracker.getAddress(conf), conf);    }    /**     * Start with the local machine name, and the addr of the target JobTracker     */    public TaskTracker(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {        maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);        this.fConf = conf;        this.jobTrackAddr = jobTrackAddr;        this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);        this.mapOutputFile = new MapOutputFile();        this.mapOutputFile.setConf(conf);        initialize();    }    /**     * Do the real constructor work here.  It's in a separate method     * so we can call it again and "recycle" the object after calling     * close().     */    void initialize() throws IOException {        this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);        LOG.info("Starting tracker " + taskTrackerName);        this.localHostname = InetAddress.getLocalHost().getHostName();        new JobConf(this.fConf).deleteLocalFiles(SUBDIR);        // Clear out state tables        this.tasks = new TreeMap();        this.runningTasks = new TreeMap();        this.mapTotal = 0;        this.reduceTotal = 0;        // port numbers        this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);        this.mapOutputPort = this.fConf.getInt("mapred.task.tracker.output.port", 50040);        // RPC initialization        while (true) {            try {                this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);                this.taskReportServer.start();                break;            } catch (BindException e) {                LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");                this.taskReportPort++;            }                }        while (true) {            try {                this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);                this.mapOutputServer.start();                break;            } catch (BindException e) {                LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");                this.mapOutputPort++;            }        }        // Clear out temporary files that might be lying around        this.mapOutputFile.cleanupStorage();        this.justStarted = true;        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);    }    /**     * Close down the TaskTracker and all its components.  We must also shutdown     * any running tasks or threads, and cleanup disk space.  A new TaskTracker     * within the same process space might be restarted, so everything must be     * clean.     */    public synchronized void close() throws IOException {        //        // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',        // because calling jobHasFinished() may result in an edit to 'tasks'.        //        TreeMap tasksToClose = new TreeMap();        tasksToClose.putAll(tasks);        for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) {            TaskInProgress tip = (TaskInProgress) it.next();            tip.jobHasFinished();        }        // Wait for them to die and report in        try {            Thread.sleep(5000);        } catch (InterruptedException ie) {        }        //        // Shutdown local RPC servers.  Do them        // in parallel, as RPC servers can take a long        // time to shutdown.  (They need to wait a full        // RPC timeout, which might be 10-30 seconds.)        //        new Thread() {            public void run() {                if (taskReportServer != null) {                    taskReportServer.stop();                    taskReportServer = null;                }            }        }.start();        if (mapOutputServer != null) {            mapOutputServer.stop();            mapOutputServer = null;        }        // Clear local storage        this.mapOutputFile.cleanupStorage();    }    /**     * The connection to the JobTracker, used by the TaskRunner      * for locating remote files.     */    public InterTrackerProtocol getJobClient() {      return jobClient;    }    /**     * Main service loop.  Will stay in this loop forever.     */    int offerService() throws Exception {        long lastHeartbeat = 0;        while (running) {            long now = System.currentTimeMillis();            long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);            if (waitTime > 0) {                try {                    Thread.sleep(waitTime);                } catch (InterruptedException ie) {                }                continue;            }            //            // Emit standard hearbeat message to check in with JobTracker            //            Vector taskReports = new Vector();            synchronized (this) {                for (Iterator it = runningTasks.keySet().iterator(); it.hasNext(); ) {                    String taskid = (String) it.next();                    TaskInProgress tip = (TaskInProgress) runningTasks.get(taskid);                    TaskStatus status = tip.createStatus();                    taskReports.add(status);                    if (status.getRunState() != TaskStatus.RUNNING) {                        if (tip.getTask().isMapTask()) {                            mapTotal--;                        } else {                            reduceTotal--;                        }                        it.remove();                    }                }            }            //            // Xmit the heartbeat            //            if (justStarted) {                this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);            }                        int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted);            justStarted = false;                          if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {                return STALE_STATE;            }            //            // Check if we should create a new Task            //            if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {                Task t = jobClient.pollForNewTask(taskTrackerName);                if (t != null) {                    TaskInProgress tip = new TaskInProgress(t, this.fConf);                    synchronized (this) {                      tasks.put(t.getTaskId(), tip);                      if (t.isMapTask()) {                          mapTotal++;                      } else {                          reduceTotal++;                      }                      runningTasks.put(t.getTaskId(), tip);                    }                    tip.launchTask();                }            }            //            // Kill any tasks that have not reported progress in the last X seconds.            //            synchronized (this) {                for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {                    TaskInProgress tip = (TaskInProgress) it.next();                    if ((tip.getRunState() == TaskStatus.RUNNING) &&                        (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout)) {                        LOG.info("Task " + tip.getTask().getTaskId() + " timed out.  Killing.");                        tip.reportDiagnosticInfo("Timed out.");                        tip.killAndCleanup();                    }                }            }            //            // Check for any Tasks that should be killed, even if            // the containing Job is still ongoing.  (This happens            // with speculative execution, when one version of the            // task finished before another            //            //            // Check for any Tasks whose job may have ended            //            String toCloseId = jobClient.pollForTaskWithClosedJob(taskTrackerName);            if (toCloseId != null) {              synchronized (this) {                TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);                tip.jobHasFinished();              }            }            lastHeartbeat = now;        }        return 0;    }    /**     * The server retry loop.       * This while-loop attempts to connect to the JobTracker.  It only      * loops when the old TaskTracker has gone bad (its state is     * stale somehow) and we need to reinitialize everything.     */    public void run() {        try {            while (running) {                boolean staleState = false;                try {                    // This while-loop attempts reconnects if we get network errors                    while (running && ! staleState) {                        try {                            if (offerService() == STALE_STATE) {                                staleState = true;                            }                        } catch (Exception ex) {                            LOG.log(Level.INFO, "Lost connection to JobTracker [" + jobTrackAddr + "].  Retrying...", ex);                            try {                                Thread.sleep(5000);                            } catch (InterruptedException ie) {                            }                        }                    }                } finally {                    close();                }                LOG.info("Reinitializing local state");                initialize();            }        } catch (IOException iex) {            LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString());            return;        }    }    ///////////////////////////////////////////////////////    // TaskInProgress maintains all the info for a Task that    // lives at this TaskTracker.  It maintains the Task object,    // its TaskStatus, and the TaskRunner.    ///////////////////////////////////////////////////////    class TaskInProgress {        Task task;        float progress;        int runstate;        String stateString = "";        long lastProgressReport;        StringBuffer diagnosticInfo = new StringBuffer();        TaskRunner runner;        boolean done = false;        boolean wasKilled = false;        private JobConf jobConf;        /**         */        public TaskInProgress(Task task, Configuration conf) throws IOException {            this.task = task;            this.lastProgressReport = System.currentTimeMillis();

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?