jobtracker.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 910 行 · 第 1/3 页

JAVA
910
字号
    /**     * Start the JobTracker process, listen on the indicated port     */    JobTracker(Configuration conf) throws IOException {        //        // Grab some static constants        //        maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);        RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);        RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);        TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);        PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 0.1f);        MIN_SLOTS_FOR_PADDING = 3 * maxCurrentTasks;        // This is a directory of temporary submission files.  We delete it        // on startup, and can delete any files that we're done with        this.conf = conf;        JobConf jobConf = new JobConf(conf);        this.systemDir = jobConf.getSystemDir();        this.fs = FileSystem.get(conf);        FileUtil.fullyDelete(fs, systemDir);        fs.mkdirs(systemDir);        // Same with 'localDir' except it's always on the local disk.        jobConf.deleteLocalFiles(SUBDIR);        // Set ports, start RPC servers, etc.        InetSocketAddress addr = getAddress(conf);        this.localMachine = addr.getHostName();        this.port = addr.getPort();        this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);        this.interTrackerServer.start();	Properties p = System.getProperties();	for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {	    String key = (String) it.next();	    String val = (String) p.getProperty(key);	    LOG.info("Property '" + key + "' is " + val);	}        this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);        this.infoServer = new JobTrackerInfoServer(this, infoPort);        this.infoServer.start();        this.startTime = System.currentTimeMillis();        new Thread(this.expireTrackers).start();        new Thread(this.retireJobs).start();        new Thread(this.initJobs).start();    }    public static InetSocketAddress getAddress(Configuration conf) {      String jobTrackerStr =        conf.get("mapred.job.tracker", "localhost:8012");      int colon = jobTrackerStr.indexOf(":");      if (colon < 0) {        throw new RuntimeException("Bad mapred.job.tracker: "+jobTrackerStr);      }      String jobTrackerName = jobTrackerStr.substring(0, colon);      int jobTrackerPort = Integer.parseInt(jobTrackerStr.substring(colon+1));      return new InetSocketAddress(jobTrackerName, jobTrackerPort);    }    /**     * Run forever     */    public void offerService() {        try {            this.interTrackerServer.join();        } catch (InterruptedException ie) {        }    }    ///////////////////////////////////////////////////////    // Maintain lookup tables; called by JobInProgress    // and TaskInProgress    ///////////////////////////////////////////////////////    void createTaskEntry(String taskid, String taskTracker, TaskInProgress tip) {        LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");        // taskid --> tracker        taskidToTrackerMap.put(taskid, taskTracker);        // tracker --> taskid        TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);        if (taskset == null) {            taskset = new TreeSet();            trackerToTaskMap.put(taskTracker, taskset);        }        taskset.add(taskid);        // taskid --> TIP        taskidToTIPMap.put(taskid, tip);    }    void removeTaskEntry(String taskid) {        // taskid --> tracker        String tracker = (String) taskidToTrackerMap.remove(taskid);        // tracker --> taskid        TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);        if (trackerSet != null) {            trackerSet.remove(taskid);        }        // taskid --> TIP        taskidToTIPMap.remove(taskid);    }    ///////////////////////////////////////////////////////    // Accessors for objects that want info on jobs, tasks,    // trackers, etc.    ///////////////////////////////////////////////////////    public int getTotalSubmissions() {        return totalSubmissions;    }    public String getJobTrackerMachine() {        return localMachine;    }    public int getTrackerPort() {        return port;    }    public int getInfoPort() {        return infoPort;    }    public long getStartTime() {        return startTime;    }    public Vector runningJobs() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.RUNNING) {                v.add(jip);            }        }        return v;    }    public Vector failedJobs() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.FAILED) {                v.add(jip);            }        }        return v;    }    public Vector completedJobs() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.SUCCEEDED) {                v.add(jip);            }        }        return v;    }    public Collection taskTrackers() {      synchronized (taskTrackers) {        return taskTrackers.values();      }    }    public TaskTrackerStatus getTaskTracker(String trackerID) {      synchronized (taskTrackers) {        return (TaskTrackerStatus) taskTrackers.get(trackerID);      }    }    ////////////////////////////////////////////////////    // InterTrackerProtocol    ////////////////////////////////////////////////////    public void initialize(String taskTrackerName) {      synchronized (taskTrackers) {        boolean seenBefore = updateTaskTrackerStatus(taskTrackerName, null);        if (seenBefore) {          lostTaskTracker(taskTrackerName);        }      }    }    /**     * Update the last recorded status for the given task tracker.     * It assumes that the taskTrackers are locked on entry.     * @author Owen O'Malley     * @param trackerName The name of the tracker     * @param status The new status for the task tracker     * @return Was an old status found?     */    private boolean updateTaskTrackerStatus(String trackerName,                                            TaskTrackerStatus status) {      TaskTrackerStatus oldStatus =         (TaskTrackerStatus) taskTrackers.get(trackerName);      if (oldStatus != null) {        totalMaps -= oldStatus.countMapTasks();        totalReduces -= oldStatus.countReduceTasks();        if (status == null) {          taskTrackers.remove(trackerName);        }      }      if (status != null) {        totalMaps += status.countMapTasks();        totalReduces += status.countReduceTasks();        taskTrackers.put(trackerName, status);      }      return oldStatus != null;    }        /**     * Process incoming heartbeat messages from the task trackers.     */    public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) {        String trackerName = trackerStatus.getTrackerName();        trackerStatus.setLastSeen(System.currentTimeMillis());        synchronized (taskTrackers) {            synchronized (trackerExpiryQueue) {                boolean seenBefore = updateTaskTrackerStatus(trackerName,                                                             trackerStatus);                if (initialContact) {                    // If it's first contact, then clear out any state hanging around                    if (seenBefore) {                        lostTaskTracker(trackerName);                    }                } else {                    // If not first contact, there should be some record of the tracker                    if (!seenBefore) {                        return InterTrackerProtocol.UNKNOWN_TASKTRACKER;                    }                }                if (initialContact) {                    trackerExpiryQueue.add(trackerStatus);                }            }        }        updateTaskStatuses(trackerStatus);        //LOG.info("Got heartbeat from "+trackerName);        return InterTrackerProtocol.TRACKERS_OK;    }    /**     * A tracker wants to know if there's a Task to run.  Returns     * a task we'd like the TaskTracker to execute right now.     *     * Eventually this function should compute load on the various TaskTrackers,     * and incorporate knowledge of DFS file placement.  But for right now, it     * just grabs a single item out of the pending task list and hands it back.     */    public synchronized Task pollForNewTask(String taskTracker) {        //        // Compute average map and reduce task numbers across pool        //        int avgMaps = 0;        int avgReduces = 0;        int numTaskTrackers;        TaskTrackerStatus tts;        synchronized (taskTrackers) {          numTaskTrackers = taskTrackers.size();          tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);        }        if (numTaskTrackers > 0) {          avgMaps = totalMaps / numTaskTrackers;          avgReduces = totalReduces / numTaskTrackers;        }        int totalCapacity = numTaskTrackers * maxCurrentTasks;        //        // Get map + reduce counts for the current tracker.        //        if (tts == null) {          LOG.warning("Unknown task tracker polling; ignoring: " + taskTracker);          return null;        }        int numMaps = tts.countMapTasks();        int numReduces = tts.countReduceTasks();        //        // In the below steps, we allocate first a map task (if appropriate),        // and then a reduce task if appropriate.  We go through all jobs        // in order of job arrival; jobs only get serviced if their         // predecessors are serviced, too.        //        //        // We hand a task to the current taskTracker if the given machine         // has a workload that's equal to or less than the averageMaps         // +/- TASK_ALLOC_EPSILON.  (That epsilon is in place in case        // there is an odd machine that is failing for some reason but         // has not yet been removed from the pool, making capacity seem        // larger than it really is.)        //        synchronized (jobsByArrival) {            if ((numMaps < maxCurrentTasks) &&                (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) {                int totalNeededMaps = 0;                for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {                    JobInProgress job = (JobInProgress) it.next();                    if (job.getStatus().getRunState() != JobStatus.RUNNING) {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?