⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobtracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        if (taskIds != null) {            ArrayList list = new ArrayList();            for (Iterator it = taskIds.iterator(); it.hasNext(); ) {                String taskId = (String) it.next();                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);                if (tip.shouldCloseForClosedJob(taskId)) {                    //                     // This is how the JobTracker ends a task at the TaskTracker.                    // It may be successfully completed, or may be killed in                    // mid-execution.                    //                   list.add(taskId);                }            }            return (String[]) list.toArray(new String[list.size()]);        }        return null;    }    /**     * A TaskTracker wants to know the physical locations of completed, but not     * yet closed, tasks.  This exists so the reduce task thread can locate     * map task outputs.     */    public synchronized MapOutputLocation[]              locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {        ArrayList result = new ArrayList(mapTasksNeeded.length);        JobInProgress job = getJob(jobId);        for (int i = 0; i < mapTasksNeeded.length; i++) {          TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);          if (status != null) {             String trackerId =                (String) taskidToTrackerMap.get(status.getTaskId());             TaskTrackerStatus tracker;             synchronized (taskTrackers) {               tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);             }             result.add(new MapOutputLocation(status.getTaskId(),                                               mapTasksNeeded[i],                                              tracker.getHost(),                                               tracker.getHttpPort()));          }        }        return (MapOutputLocation[])                result.toArray(new MapOutputLocation[result.size()]);    }    /**     * Grab the local fs name     */    public synchronized String getFilesystemName() throws IOException {        return fs.getName();    }    public void reportTaskTrackerError(String taskTracker,            String errorClass,            String errorMessage) throws IOException {        LOG.warn("Report from " + taskTracker + ": " + errorMessage);            }    ////////////////////////////////////////////////////    // JobSubmissionProtocol    ////////////////////////////////////////////////////    /**     * JobTracker.submitJob() kicks off a new job.       *     * Create a 'JobInProgress' object, which contains both JobProfile     * and JobStatus.  Those two sub-objects are sometimes shipped outside     * of the JobTracker.  But JobInProgress adds info that's useful for     * the JobTracker alone.     *     * We add the JIP to the jobInitQueue, which is processed      * asynchronously to handle split-computation and build up     * the right TaskTracker/Block mapping.     */    public synchronized JobStatus submitJob(String jobFile) throws IOException {        totalSubmissions++;        JobInProgress job = new JobInProgress(jobFile, this, this.conf);        synchronized (jobs) {            synchronized (jobsByArrival) {                synchronized (jobInitQueue) {                    jobs.put(job.getProfile().getJobId(), job);                    jobsByArrival.add(job);                    jobInitQueue.add(job);                    jobInitQueue.notifyAll();                }            }        }        myMetrics.submitJob();        return job.getStatus();    }    public synchronized ClusterStatus getClusterStatus() {        synchronized (taskTrackers) {          return new ClusterStatus(taskTrackers.size(),                                   totalMaps,                                   totalReduces,                                   maxCurrentTasks);                  }    }        public synchronized void killJob(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        job.kill();    }    public synchronized JobProfile getJobProfile(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job != null) {            return job.getProfile();        } else {            return null;        }    }    public synchronized JobStatus getJobStatus(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job != null) {            return job.getStatus();        } else {            return null;        }    }    public synchronized TaskReport[] getMapTaskReports(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job == null) {            return new TaskReport[0];        } else {            Vector reports = new Vector();            Vector completeMapTasks = job.reportTasksInProgress(true, true);            for (Iterator it = completeMapTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            Vector incompleteMapTasks = job.reportTasksInProgress(true, false);            for (Iterator it = incompleteMapTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);        }    }    public synchronized TaskReport[] getReduceTaskReports(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job == null) {            return new TaskReport[0];        } else {            Vector reports = new Vector();            Vector completeReduceTasks = job.reportTasksInProgress(false, true);            for (Iterator it = completeReduceTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            Vector incompleteReduceTasks = job.reportTasksInProgress(false, false);            for (Iterator it = incompleteReduceTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);        }    }    /**     * Get the diagnostics for a given task     * @param jobId the id of the job     * @param tipId the id of the tip      * @param taskId the id of the task     * @return a list of the diagnostic messages     */    public synchronized List<String> getTaskDiagnostics(String jobId,                                                        String tipId,                                                        String taskId) {      JobInProgress job = (JobInProgress) jobs.get(jobId);      if (job == null) {        throw new IllegalArgumentException("Job " + jobId + " not found.");      }      TaskInProgress tip = job.getTaskInProgress(tipId);      if (tip == null) {        throw new IllegalArgumentException("TIP " + tipId + " not found.");      }      return tip.getDiagnosticInfo(taskId);    }        /** Get all the TaskStatuses from the tipid. */    TaskStatus[] getTaskStatuses(String jobid, String tipid){	JobInProgress job = (JobInProgress) jobs.get(jobid);	if (job == null){	    return new TaskStatus[0];	}	TaskInProgress tip = (TaskInProgress) job.getTaskInProgress(tipid);	if (tip == null){	    return new TaskStatus[0];	}	return tip.getTaskStatuses();    }    /**     * Get tracker name for a given task id.     * @param taskId the name of the task     * @return The name of the task tracker     */    public synchronized String getAssignedTracker(String taskId) {      return (String) taskidToTrackerMap.get(taskId);    }        public JobStatus[] jobsToComplete() {        Vector v = new Vector();        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {            JobInProgress jip = (JobInProgress) it.next();            JobStatus status = jip.getStatus();            if (status.getRunState() == JobStatus.RUNNING 		|| status.getRunState() == JobStatus.PREP) {		status.setStartTime(jip.getStartTime());                status.setUsername(jip.getProfile().getUser());                v.add(status);            }        }        return (JobStatus[]) v.toArray(new JobStatus[v.size()]);    }         ///////////////////////////////////////////////////////////////    // JobTracker methods    ///////////////////////////////////////////////////////////////    public JobInProgress getJob(String jobid) {        return (JobInProgress) jobs.get(jobid);    }    /**     * Grab random num for job id     */    String createUniqueId() {        return idFormat.format(nextJobId++);    }    ////////////////////////////////////////////////////    // Methods to track all the TaskTrackers    ////////////////////////////////////////////////////    /**     * Accept and process a new TaskTracker profile.  We might     * have known about the TaskTracker previously, or it might     * be brand-new.  All task-tracker structures have already     * been updated.  Just process the contained tasks and any     * jobs that might be affected.     */    void updateTaskStatuses(TaskTrackerStatus status) {        for (Iterator it = status.taskReports(); it.hasNext(); ) {            TaskStatus report = (TaskStatus) it.next();            report.setTaskTracker(status.getTrackerName());            String taskId = report.getTaskId();            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);            if (tip == null) {                LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());            } else {                expireLaunchingTasks.removeTask(taskId);                tip.getJob().updateTaskStatus(tip, report, myMetrics);            }        }    }    /**     * We lost the task tracker!  All task-tracker structures have      * already been updated.  Just process the contained tasks and any     * jobs that might be affected.     */    void lostTaskTracker(String trackerName, String hostname) {        LOG.info("Lost tracker '" + trackerName + "'");        TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName);        trackerToTaskMap.remove(trackerName);        if (lostTasks != null) {            for (Iterator it = lostTasks.iterator(); it.hasNext(); ) {                String taskId = (String) it.next();                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);                // Completed reduce tasks never need to be failed, because                 // their outputs go to dfs                if (tip.isMapTask() || !tip.isComplete()) {                  JobInProgress job = tip.getJob();                  // if the job is done, we don't want to change anything                  if (job.getStatus().getRunState() == JobStatus.RUNNING) {                    job.failedTask(tip, taskId, "Lost task tracker",                                    Phase.MAP, hostname, trackerName, myMetrics);                  }                }            }        }    }    ////////////////////////////////////////////////////////////    // main()    ////////////////////////////////////////////////////////////    /**     * Start the JobTracker process.  This is used only for debugging.  As a rule,     * JobTracker should be run as part of the DFS Namenode process.     */    public static void main(String argv[]) throws IOException, InterruptedException {        if (argv.length != 0) {          System.out.println("usage: JobTracker");          System.exit(-1);        }        Configuration conf=new Configuration();        startTracker(conf);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -