jobtracker.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 910 行 · 第 1/3 页

JAVA
910
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/******************************************************* * JobTracker is the central location for submitting and  * tracking MR jobs in a network environment. * * @author Mike Cafarella *******************************************************/public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol {    static long JOBINIT_SLEEP_INTERVAL = 2000;    static long RETIRE_JOB_INTERVAL;    static long RETIRE_JOB_CHECK_INTERVAL;    static float TASK_ALLOC_EPSILON;    static float PAD_FRACTION;    static float MIN_SLOTS_FOR_PADDING;    public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.JobTracker");    private static JobTracker tracker = null;    public static void startTracker(Configuration conf) throws IOException {      if (tracker != null)        throw new IOException("JobTracker already running.");      while (true) {        try {          tracker = new JobTracker(conf);          break;        } catch (IOException e) {          LOG.log(Level.WARNING, "Starting tracker", e);        }        try {          Thread.sleep(1000);        } catch (InterruptedException e) {        }      }      tracker.offerService();    }    public static JobTracker getTracker() {        return tracker;    }    ///////////////////////////////////////////////////////    // Used to expire TaskTrackers that have gone down    ///////////////////////////////////////////////////////    class ExpireTrackers implements Runnable {        boolean shouldRun = true;        public ExpireTrackers() {        }        /**         * The run method lives for the life of the JobTracker, and removes TaskTrackers         * that have not checked in for some time.         */        public void run() {            while (shouldRun) {                //                // Thread runs periodically to check whether trackers should be expired.                // The sleep interval must be no more than half the maximum expiry time                // for a task tracker.                //                try {                    Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3);                } catch (InterruptedException ie) {                }                //                // Loop through all expired items in the queue                //                synchronized (taskTrackers) {                    synchronized (trackerExpiryQueue) {                        long now = System.currentTimeMillis();                        TaskTrackerStatus leastRecent = null;                        while ((trackerExpiryQueue.size() > 0) &&                               ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&                               (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {                            // Remove profile from head of queue                            trackerExpiryQueue.remove(leastRecent);                            String trackerName = leastRecent.getTrackerName();                            // Figure out if last-seen time should be updated, or if tracker is dead                            TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());                            // Items might leave the taskTracker set through other means; the                            // status stored in 'taskTrackers' might be null, which means the                            // tracker has already been destroyed.                            if (newProfile != null) {                                if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {                                    // Remove completely                                    updateTaskTrackerStatus(trackerName, null);                                    lostTaskTracker(leastRecent.getTrackerName());                                } else {                                    // Update time by inserting latest profile                                    trackerExpiryQueue.add(newProfile);                                }                            }                        }                    }                }            }        }                /**         * Stop the tracker on next iteration         */        public void stopTracker() {            shouldRun = false;        }    }    ///////////////////////////////////////////////////////    // Used to remove old finished Jobs that have been around for too long    ///////////////////////////////////////////////////////    class RetireJobs implements Runnable {        boolean shouldRun = true;        public RetireJobs() {        }        /**         * The run method lives for the life of the JobTracker,         * and removes Jobs that are not still running, but which         * finished a long time ago.         */        public void run() {            while (shouldRun) {                try {                    Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);                } catch (InterruptedException ie) {                }                                synchronized (jobs) {                    synchronized (jobInitQueue) {                        synchronized (jobsByArrival) {                            for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) {                                String jobid = (String) it.next();                                JobInProgress job = (JobInProgress) jobs.get(jobid);                                if (job.getStatus().getRunState() != JobStatus.RUNNING &&                                    job.getStatus().getRunState() != JobStatus.PREP &&                                    (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {                                    it.remove();                                                                jobInitQueue.remove(job);                                    jobsByArrival.remove(job);                                }                            }                        }                    }                }            }        }        public void stopRetirer() {            shouldRun = false;        }    }    /////////////////////////////////////////////////////////////////    //  Used to init new jobs that have just been created    /////////////////////////////////////////////////////////////////    class JobInitThread implements Runnable {        boolean shouldRun = true;        public JobInitThread() {        }        public void run() {            while (shouldRun) {                JobInProgress job = null;                synchronized (jobInitQueue) {                    if (jobInitQueue.size() > 0) {                        job = (JobInProgress) jobInitQueue.elementAt(0);                        jobInitQueue.remove(job);                    } else {                        try {                            jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);                        } catch (InterruptedException iex) {                        }                    }                }                try {                    if (job != null) {                        job.initTasks();                    }                } catch (Exception e) {                    LOG.log(Level.WARNING, "job init failed", e);                    job.kill();                }            }        }        public void stopIniter() {            shouldRun = false;        }    }    /////////////////////////////////////////////////////////////////    // The real JobTracker    ////////////////////////////////////////////////////////////////    int port;    String localMachine;    long startTime;    int totalSubmissions = 0;    Random r = new Random();    private int maxCurrentTasks;    //    // Properties to maintain while running Jobs and Tasks:    //    // 1.  Each Task is always contained in a single Job.  A Job succeeds when all its     //     Tasks are complete.    //    // 2.  Every running or successful Task is assigned to a Tracker.  Idle Tasks are not.    //    // 3.  When a Tracker fails, all of its assigned Tasks are marked as failures.    //    // 4.  A Task might need to be reexecuted if it (or the machine it's hosted on) fails    //     before the Job is 100% complete.  Sometimes an upstream Task can fail without    //     reexecution if all downstream Tasks that require its output have already obtained    //     the necessary files.    //    // All the known jobs.  (jobid->JobInProgress)    TreeMap jobs = new TreeMap();    Vector jobsByArrival = new Vector();    // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)    TreeMap taskidToTIPMap = new TreeMap();    // (taskid --> trackerID)     TreeMap taskidToTrackerMap = new TreeMap();    // (trackerID->TreeSet of taskids running at that tracker)    TreeMap trackerToTaskMap = new TreeMap();    //    // Watch and expire TaskTracker objects using these structures.    // We can map from Name->TaskTrackerStatus, or we can expire by time.    //    int totalMaps = 0;    int totalReduces = 0;    private TreeMap taskTrackers = new TreeMap();    Vector jobInitQueue = new Vector();    ExpireTrackers expireTrackers = new ExpireTrackers();    RetireJobs retireJobs = new RetireJobs();    JobInitThread initJobs = new JobInitThread();    /**     * It might seem like a bug to maintain a TreeSet of status objects,     * which can be updated at any time.  But that's not what happens!  We     * only update status objects in the taskTrackers table.  Status objects     * are never updated once they enter the expiry queue.  Instead, we wait     * for them to expire and remove them from the expiry queue.  If a status     * object has been updated in the taskTracker table, the latest status is      * reinserted.  Otherwise, we assume the tracker has expired.     */    TreeSet trackerExpiryQueue = new TreeSet(new Comparator() {        public int compare(Object o1, Object o2) {            TaskTrackerStatus p1 = (TaskTrackerStatus) o1;            TaskTrackerStatus p2 = (TaskTrackerStatus) o2;            if (p1.getLastSeen() < p2.getLastSeen()) {                return -1;            } else if (p1.getLastSeen() > p2.getLastSeen()) {                return 1;            } else {                return (p1.getTrackerName().compareTo(p2.getTrackerName()));            }        }    });    // Used to provide an HTML view on Job, Task, and TaskTracker structures    JobTrackerInfoServer infoServer;    int infoPort;    Server interTrackerServer;    // Some jobs are stored in a local system directory.  We can delete    // the files when we're done with the job.    static final String SUBDIR = "jobTracker";    FileSystem fs;    File systemDir;    private Configuration conf;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?