⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobtracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        taskidToTrackerMap.put(taskid, taskTracker);        // tracker --> taskid        TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);        if (taskset == null) {            taskset = new TreeSet();            trackerToTaskMap.put(taskTracker, taskset);        }        taskset.add(taskid);        // taskid --> TIP        taskidToTIPMap.put(taskid, tip);    }    void removeTaskEntry(String taskid) {        // taskid --> tracker        String tracker = (String) taskidToTrackerMap.remove(taskid);        // tracker --> taskid        TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);        if (trackerSet != null) {            trackerSet.remove(taskid);        }        // taskid --> TIP        taskidToTIPMap.remove(taskid);    }    ///////////////////////////////////////////////////////    // Accessors for objects that want info on jobs, tasks,    // trackers, etc.    ///////////////////////////////////////////////////////    public int getTotalSubmissions() {        return totalSubmissions;    }    public String getJobTrackerMachine() {        return localMachine;    }    public int getTrackerPort() {        return port;    }    public int getInfoPort() {        return infoPort;    }    public long getStartTime() {        return startTime;    }    public Vector runningJobs() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.RUNNING) {                v.add(jip);            }        }        return v;    }    public Vector failedJobs() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.FAILED) {                v.add(jip);            }        }        return v;    }    public Vector completedJobs() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.SUCCEEDED) {                v.add(jip);            }        }        return v;    }    public Collection taskTrackers() {      synchronized (taskTrackers) {        return taskTrackers.values();      }    }    public TaskTrackerStatus getTaskTracker(String trackerID) {      synchronized (taskTrackers) {        return (TaskTrackerStatus) taskTrackers.get(trackerID);      }    }    ////////////////////////////////////////////////////    // InterTrackerProtocol    ////////////////////////////////////////////////////    /**     * Update the last recorded status for the given task tracker.     * It assumes that the taskTrackers are locked on entry.     * @author Owen O'Malley     * @param trackerName The name of the tracker     * @param status The new status for the task tracker     * @return Was an old status found?     */    private boolean updateTaskTrackerStatus(String trackerName,                                            TaskTrackerStatus status) {      TaskTrackerStatus oldStatus =         (TaskTrackerStatus) taskTrackers.get(trackerName);      if (oldStatus != null) {        totalMaps -= oldStatus.countMapTasks();        totalReduces -= oldStatus.countReduceTasks();        if (status == null) {          taskTrackers.remove(trackerName);        }      }      if (status != null) {        totalMaps += status.countMapTasks();        totalReduces += status.countReduceTasks();        taskTrackers.put(trackerName, status);      }      return oldStatus != null;    }        /**     * Process incoming heartbeat messages from the task trackers.     */    public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) {        String trackerName = trackerStatus.getTrackerName();        trackerStatus.setLastSeen(System.currentTimeMillis());        synchronized (taskTrackers) {            synchronized (trackerExpiryQueue) {                boolean seenBefore = updateTaskTrackerStatus(trackerName,                                                             trackerStatus);                if (initialContact) {                    // If it's first contact, then clear out any state hanging around                    if (seenBefore) {                        lostTaskTracker(trackerName, trackerStatus.getHost());                    }                } else {                    // If not first contact, there should be some record of the tracker                    if (!seenBefore) {                        LOG.warn("Status from unknown Tracker : " + trackerName);                        taskTrackers.remove(trackerName);                         return InterTrackerProtocol.UNKNOWN_TASKTRACKER;                    }                }                if (initialContact) {                    trackerExpiryQueue.add(trackerStatus);                }            }        }        updateTaskStatuses(trackerStatus);        //LOG.info("Got heartbeat from "+trackerName);        return InterTrackerProtocol.TRACKERS_OK;    }    /**     * A tracker wants to know if there's a Task to run.  Returns     * a task we'd like the TaskTracker to execute right now.     *     * Eventually this function should compute load on the various TaskTrackers,     * and incorporate knowledge of DFS file placement.  But for right now, it     * just grabs a single item out of the pending task list and hands it back.     */    public synchronized Task pollForNewTask(String taskTracker) {        //        // Compute average map and reduce task numbers across pool        //        int remainingReduceLoad = 0;        int remainingMapLoad = 0;        int numTaskTrackers;        TaskTrackerStatus tts;	        synchronized (taskTrackers) {          numTaskTrackers = taskTrackers.size();          tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);        }        if (tts == null) {          LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);          return null;        }        int totalCapacity = numTaskTrackers * maxCurrentTasks;        synchronized(jobsByArrival){            for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {                    JobInProgress job = (JobInProgress) it.next();                    if (job.getStatus().getRunState() == JobStatus.RUNNING) {                         int totalMapTasks = job.desiredMaps();                         int totalReduceTasks = job.desiredReduces();                         remainingMapLoad += (totalMapTasks - job.finishedMaps());                         remainingReduceLoad += (totalReduceTasks - job.finishedReduces());                    }            }           }        // find out the maximum number of maps or reduces that we are willing        // to run on any node.        int maxMapLoad = 0;        int maxReduceLoad = 0;        if (numTaskTrackers > 0) {          maxMapLoad = Math.min(maxCurrentTasks,                                (int) Math.ceil((double) remainingMapLoad /                                                 numTaskTrackers));          maxReduceLoad = Math.min(maxCurrentTasks,                                   (int) Math.ceil((double) remainingReduceLoad                                                   / numTaskTrackers));        }                //        // Get map + reduce counts for the current tracker.        //        int numMaps = tts.countMapTasks();        int numReduces = tts.countReduceTasks();        //        // In the below steps, we allocate first a map task (if appropriate),        // and then a reduce task if appropriate.  We go through all jobs        // in order of job arrival; jobs only get serviced if their         // predecessors are serviced, too.        //        //        // We hand a task to the current taskTracker if the given machine         // has a workload that's less than the maximum load of that kind of        // task.        //               synchronized (jobsByArrival) {            if (numMaps < maxMapLoad) {                int totalNeededMaps = 0;                for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {                    JobInProgress job = (JobInProgress) it.next();                    if (job.getStatus().getRunState() != JobStatus.RUNNING) {                        continue;                    }                    Task t = job.obtainNewMapTask(tts, numTaskTrackers);                    if (t != null) {                      expireLaunchingTasks.addNewTask(t.getTaskId());                      myMetrics.launchMap();                      return t;                    }                    //                    // Beyond the highest-priority task, reserve a little                     // room for failures and speculative executions; don't                     // schedule tasks to the hilt.                    //                    totalNeededMaps += job.desiredMaps();                    int padding = 0;                    if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {                      padding = Math.min(maxCurrentTasks,                                         (int)(totalNeededMaps * PAD_FRACTION));                    }                    if (totalMaps + padding >= totalCapacity) {                        break;                    }                }            }            //            // Same thing, but for reduce tasks            //            if (numReduces < maxReduceLoad) {                int totalNeededReduces = 0;                for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {                    JobInProgress job = (JobInProgress) it.next();                    if (job.getStatus().getRunState() != JobStatus.RUNNING ||                        job.numReduceTasks == 0) {                        continue;                    }                    Task t = job.obtainNewReduceTask(tts, numTaskTrackers);                    if (t != null) {                      expireLaunchingTasks.addNewTask(t.getTaskId());                      myMetrics.launchReduce();                      return t;                    }                    //                    // Beyond the highest-priority task, reserve a little                     // room for failures and speculative executions; don't                     // schedule tasks to the hilt.                    //                    totalNeededReduces += job.desiredReduces();                    int padding = 0;                    if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {                        padding =                           Math.min(maxCurrentTasks,                                   (int) (totalNeededReduces * PAD_FRACTION));                    }                    if (totalReduces + padding >= totalCapacity) {                        break;                    }                }            }        }        return null;    }    /**     * A tracker wants to know if any of its Tasks have been     * closed (because the job completed, whether successfully or not)     */    public synchronized String[] pollForTaskWithClosedJob(String taskTracker) {        TreeSet taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -