⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tasktracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.metrics.Metrics;import org.apache.hadoop.util.*;import org.apache.hadoop.util.DiskChecker.DiskErrorException;import java.io.*;import java.net.*;import java.util.*;import java.util.regex.Pattern;import javax.servlet.ServletContext;import javax.servlet.ServletException;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import org.apache.hadoop.metrics.MetricsRecord;import org.apache.hadoop.net.DNS;/******************************************************* * TaskTracker is a process that starts and tracks MR Tasks * in a networked environment.  It contacts the JobTracker * for Task assignments and reporting results. * * @author Mike Cafarella *******************************************************/public class TaskTracker              implements MRConstants, TaskUmbilicalProtocol, Runnable {    static final long WAIT_FOR_DONE = 3 * 1000;    private long taskTimeout;     private int httpPort;    static final int STALE_STATE = 1;    public static final Log LOG =    LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");    private boolean running = true;    String taskTrackerName;    String localHostname;    InetSocketAddress jobTrackAddr;        String taskReportBindAddress;    int taskReportPort;    Server taskReportServer = null;    InterTrackerProtocol jobClient;    StatusHttpServer server = null;        boolean shuttingDown = false;        TreeMap tasks = null;    /**     * Map from taskId -> TaskInProgress.     */    TreeMap runningTasks = null;    Map runningJobs = null;    int mapTotal = 0;    int reduceTotal = 0;    boolean justStarted = true;        //dir -> DF    Map localDirsDf = new HashMap();    long minSpaceStart = 0;    //must have this much space free to start new tasks    boolean acceptNewTasks = true;    long minSpaceKill = 0;    //if we run under this limit, kill one task    //and make sure we never receive any new jobs    //until all the old tasks have been cleaned up.    //this is if a machine is so full it's only good    //for serving map output to the other nodes    static Random r = new Random();    FileSystem fs = null;    private static final String SUBDIR = "taskTracker";    private static final String CACHEDIR = "archive";    private static final String JOBCACHE = "jobcache";    private JobConf fConf;    private MapOutputFile mapOutputFile;    private int maxCurrentTasks;    private int failures;    private int finishedCount[] = new int[1];        private class TaskTrackerMetrics {      private MetricsRecord metricsRecord = null;            private long totalTasksCompleted = 0L;            TaskTrackerMetrics() {        metricsRecord = Metrics.createRecord("mapred", "tasktracker");      }            synchronized void completeTask() {        if (metricsRecord != null) {          metricsRecord.setMetric("tasks-completed", ++totalTasksCompleted);          metricsRecord.setMetric("maps-running", mapTotal);          metricsRecord.setMetric("reduce-running", reduceTotal);          metricsRecord.update();        }      }    }        private TaskTrackerMetrics myMetrics = null;    /**     * A list of tips that should be cleaned up.     */    private BlockingQueue tasksToCleanup = new BlockingQueue();        /**     * A daemon-thread that pulls tips off the list of things to cleanup.     */    private Thread taskCleanupThread =       new Thread(new Runnable() {        public void run() {          while (true) {            try {              TaskInProgress tip = (TaskInProgress) tasksToCleanup.take();              tip.jobHasFinished();            } catch (Throwable except) {              LOG.warn(StringUtils.stringifyException(except));            }          }        }      });    {      taskCleanupThread.setDaemon(true);      taskCleanupThread.start();    }        static String getCacheSubdir() {      return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;    }    static String getJobCacheSubdir() {      return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;    }        public long getProtocolVersion(String protocol, long clientVersion) {      return TaskUmbilicalProtocol.versionID;    }    /**     * Do the real constructor work here.  It's in a separate method     * so we can call it again and "recycle" the object after calling     * close().     */    synchronized void initialize() throws IOException {        // use configured nameserver & interface to get local hostname        this.localHostname =          DNS.getDefaultHost          (fConf.get("mapred.tasktracker.dns.interface","default"),           fConf.get("mapred.tasktracker.dns.nameserver","default"));         //check local disk        checkLocalDirs(this.fConf.getLocalDirs());        fConf.deleteLocalFiles(SUBDIR);        // Clear out state tables        this.tasks = new TreeMap();        this.runningTasks = new TreeMap();        this.runningJobs = new TreeMap();        this.mapTotal = 0;        this.reduceTotal = 0;        this.acceptNewTasks = true;                this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);        this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);                        this.myMetrics = new TaskTrackerMetrics();                // port numbers        this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);        // bind address        this.taskReportBindAddress = this.fConf.get("mapred.task.tracker.report.bindAddress", "0.0.0.0");        // RPC initialization        while (true) {            try {                this.taskReportServer = RPC.getServer(this, this.taskReportBindAddress, this.taskReportPort, maxCurrentTasks, false, this.fConf);                this.taskReportServer.start();                break;            } catch (BindException e) {                LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");                this.taskReportPort++;            }                }        this.taskTrackerName = "tracker_" +                                localHostname + ":" + taskReportPort;        LOG.info("Starting tracker " + taskTrackerName);        // Clear out temporary files that might be lying around        this.mapOutputFile.cleanupStorage();        this.justStarted = true;        this.jobClient = (InterTrackerProtocol)                           RPC.waitForProxy(InterTrackerProtocol.class,                                           InterTrackerProtocol.versionID,                                            jobTrackAddr, this.fConf);                this.running = true;    }            // intialize the job directory    private void localizeJob(TaskInProgress tip) throws IOException {      Path localJarFile = null;      Task t = tip.getTask();      Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t          .getJobId()          + Path.SEPARATOR + "job.xml"));      RunningJob rjob = null;      synchronized (runningJobs) {        if (!runningJobs.containsKey(t.getJobId())) {          rjob = new RunningJob();          rjob.localized = false;          rjob.tasks = new ArrayList();          rjob.jobFile = localJobFile;          rjob.tasks.add(tip);          runningJobs.put(t.getJobId(), rjob);        } else {          rjob = (RunningJob) runningJobs.get(t.getJobId());          // keep this for later use when we just get a jobid to delete          // the data for          rjob.tasks.add(tip);        }      }      synchronized (rjob) {        if (!rjob.localized) {          localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t              .getJobId())              + Path.SEPARATOR + "job.jar");            String jobFile = t.getJobFile();          fs.copyToLocalFile(new Path(jobFile), localJobFile);          JobConf localJobConf = new JobConf(localJobFile);          String jarFile = localJobConf.getJar();          if (jarFile != null) {            fs.copyToLocalFile(new Path(jarFile), localJarFile);            localJobConf.setJar(localJarFile.toString());            FileSystem localFs = FileSystem.getNamed("local", fConf);            OutputStream out = localFs.create(localJobFile);            try {              localJobConf.write(out);            } finally {              out.close();            }            // also unjar the job.jar files in workdir            File workDir = new File(                                    new File(localJobFile.toString()).getParent(),                                    "work");            workDir.mkdirs();            RunJar.unJar(new File(localJarFile.toString()), workDir);          }          rjob.localized = true;        }      }      launchTaskForJob(tip, new JobConf(rjob.jobFile));     }        private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{      synchronized (tip) {      try {        tip.setJobConf(jobConf);        tip.launchTask();      } catch (Throwable ie) {        tip.runstate = TaskStatus.State.FAILED;        try {          tip.cleanup();        } catch (Throwable ie2) {          // Ignore it, we are just trying to cleanup.        }        String error = StringUtils.stringifyException(ie);        tip.reportDiagnosticInfo(error);        LOG.info(error);      }      }     }        public synchronized void shutdown() throws IOException {          shuttingDown = true;          close();          if (this.server != null) {            try {                LOG.info("Shutting down StatusHttpServer");                this.server.stop();            } catch (InterruptedException ex) {                ex.printStackTrace();            }          }      }    /**     * Close down the TaskTracker and all its components.  We must also shutdown     * any running tasks or threads, and cleanup disk space.  A new TaskTracker     * within the same process space might be restarted, so everything must be     * clean.     */    public synchronized void close() throws IOException {        //        // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',        // because calling jobHasFinished() may result in an edit to 'tasks'.        //        TreeMap tasksToClose = new TreeMap();        tasksToClose.putAll(tasks);        for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) {            TaskInProgress tip = (TaskInProgress) it.next();            tip.jobHasFinished();        }        // Shutdown local RPC servers.  Do them        // in parallel, as RPC servers can take a long        // time to shutdown.  (They need to wait a full        // RPC timeout, which might be 10-30 seconds.)        new Thread() {            public void run() {                if (taskReportServer != null) {                    taskReportServer.stop();                    taskReportServer = null;                }            }        }.start();        this.running = false;                // Clear local storage        this.mapOutputFile.cleanupStorage();    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -