📄 jobtracker.java
字号:
job = (JobInProgress) jobInitQueue.elementAt(0); jobInitQueue.remove(job); } else { try { jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL); } catch (InterruptedException iex) { } } } try { if (job != null) { job.initTasks(); } } catch (Exception e) { LOG.warn("job init failed", e); job.kill(); } } } public void stopIniter() { shouldRun = false; } } static class JobTrackerMetrics { private MetricsRecord metricsRecord = null; private long numMapTasksLaunched = 0L; private long numMapTasksCompleted = 0L; private long numReduceTasksLaunched = 0L; private long numReduceTasksCompleted = 0L; private long numJobsSubmitted = 0L; private long numJobsCompleted = 0L; JobTrackerMetrics() { metricsRecord = Metrics.createRecord("mapred", "jobtracker"); } synchronized void launchMap() { Metrics.report(metricsRecord, "maps-launched", ++numMapTasksLaunched); } synchronized void completeMap() { Metrics.report(metricsRecord, "maps-completed", ++numMapTasksCompleted); } synchronized void launchReduce() { Metrics.report(metricsRecord, "reduces-launched", ++numReduceTasksLaunched); } synchronized void completeReduce() { Metrics.report(metricsRecord, "reduces-completed", ++numReduceTasksCompleted); } synchronized void submitJob() { Metrics.report(metricsRecord, "jobs-submitted", ++numJobsSubmitted); } synchronized void completeJob() { Metrics.report(metricsRecord, "jobs-completed", ++numJobsCompleted); } } private JobTrackerMetrics myMetrics = null; ///////////////////////////////////////////////////////////////// // The real JobTracker //////////////////////////////////////////////////////////////// int port; String localMachine; long startTime; int totalSubmissions = 0; Random r = new Random(); private int maxCurrentTasks; // // Properties to maintain while running Jobs and Tasks: // // 1. Each Task is always contained in a single Job. A Job succeeds when all its // Tasks are complete. // // 2. Every running or successful Task is assigned to a Tracker. Idle Tasks are not. // // 3. When a Tracker fails, all of its assigned Tasks are marked as failures. // // 4. A Task might need to be reexecuted if it (or the machine it's hosted on) fails // before the Job is 100% complete. Sometimes an upstream Task can fail without // reexecution if all downstream Tasks that require its output have already obtained // the necessary files. // // All the known jobs. (jobid->JobInProgress) TreeMap jobs = new TreeMap(); Vector jobsByArrival = new Vector(); // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) Map<String, TaskInProgress> taskidToTIPMap = new TreeMap(); // (taskid --> trackerID) TreeMap taskidToTrackerMap = new TreeMap(); // (trackerID->TreeSet of taskids running at that tracker) TreeMap trackerToTaskMap = new TreeMap(); // // Watch and expire TaskTracker objects using these structures. // We can map from Name->TaskTrackerStatus, or we can expire by time. // int totalMaps = 0; int totalReduces = 0; private TreeMap taskTrackers = new TreeMap(); Vector jobInitQueue = new Vector(); ExpireTrackers expireTrackers = new ExpireTrackers(); Thread expireTrackersThread = null; RetireJobs retireJobs = new RetireJobs(); Thread retireJobsThread = null; JobInitThread initJobs = new JobInitThread(); Thread initJobsThread = null; ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks); /** * It might seem like a bug to maintain a TreeSet of status objects, * which can be updated at any time. But that's not what happens! We * only update status objects in the taskTrackers table. Status objects * are never updated once they enter the expiry queue. Instead, we wait * for them to expire and remove them from the expiry queue. If a status * object has been updated in the taskTracker table, the latest status is * reinserted. Otherwise, we assume the tracker has expired. */ TreeSet trackerExpiryQueue = new TreeSet(new Comparator() { public int compare(Object o1, Object o2) { TaskTrackerStatus p1 = (TaskTrackerStatus) o1; TaskTrackerStatus p2 = (TaskTrackerStatus) o2; if (p1.getLastSeen() < p2.getLastSeen()) { return -1; } else if (p1.getLastSeen() > p2.getLastSeen()) { return 1; } else { return (p1.getTrackerName().compareTo(p2.getTrackerName())); } } }); // Used to provide an HTML view on Job, Task, and TaskTracker structures StatusHttpServer infoServer; String infoBindAddress; int infoPort; Server interTrackerServer; // Some jobs are stored in a local system directory. We can delete // the files when we're done with the job. static final String SUBDIR = "jobTracker"; FileSystem fs; Path systemDir; private Configuration conf; /** * Start the JobTracker process, listen on the indicated port */ JobTracker(Configuration conf) throws IOException { // // Grab some static constants // maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2); RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000); RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000); TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f); PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 0.01f); // This is a directory of temporary submission files. We delete it // on startup, and can delete any files that we're done with this.conf = conf; JobConf jobConf = new JobConf(conf); this.systemDir = jobConf.getSystemDir(); this.fs = FileSystem.get(conf); fs.delete(systemDir); fs.mkdirs(systemDir); // Same with 'localDir' except it's always on the local disk. jobConf.deleteLocalFiles(SUBDIR); // Set ports, start RPC servers, etc. InetSocketAddress addr = getAddress(conf); this.localMachine = addr.getHostName(); this.port = addr.getPort(); this.interTrackerServer = RPC.getServer(this,addr.getHostName(), addr.getPort(), 10, false, conf); this.interTrackerServer.start(); Properties p = System.getProperties(); for (Iterator it = p.keySet().iterator(); it.hasNext(); ) { String key = (String) it.next(); String val = (String) p.getProperty(key); LOG.info("Property '" + key + "' is " + val); } this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030); this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0"); this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false); this.infoServer.start(); this.startTime = System.currentTimeMillis(); myMetrics = new JobTrackerMetrics(); this.expireTrackersThread = new Thread(this.expireTrackers); this.expireTrackersThread.start(); this.retireJobsThread = new Thread(this.retireJobs); this.retireJobsThread.start(); this.initJobsThread = new Thread(this.initJobs); this.initJobsThread.start(); expireLaunchingTaskThread.start(); } public static InetSocketAddress getAddress(Configuration conf) { String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); int colon = jobTrackerStr.indexOf(":"); if (colon < 0) { throw new RuntimeException("Bad mapred.job.tracker: "+jobTrackerStr); } String jobTrackerName = jobTrackerStr.substring(0, colon); int jobTrackerPort = Integer.parseInt(jobTrackerStr.substring(colon+1)); return new InetSocketAddress(jobTrackerName, jobTrackerPort); } /** * Run forever */ public void offerService() { try { this.interTrackerServer.join(); } catch (InterruptedException ie) { } LOG.info("Stopped interTrackerServer"); } void close() throws IOException { if (this.infoServer != null) { LOG.info("Stopping infoServer"); try { this.infoServer.stop(); } catch (InterruptedException ex) { ex.printStackTrace(); } } if (this.interTrackerServer != null) { LOG.info("Stopping interTrackerServer"); this.interTrackerServer.stop(); } if (this.expireTrackers != null) { LOG.info("Stopping expireTrackers"); this.expireTrackers.stopTracker(); try { this.expireTrackersThread.interrupt(); this.expireTrackersThread.join(); } catch (InterruptedException ex) { ex.printStackTrace(); } } if (this.retireJobs != null) { LOG.info("Stopping retirer"); this.retireJobs.stopRetirer(); try { this.retireJobsThread.interrupt(); this.retireJobsThread.join(); } catch (InterruptedException ex) { ex.printStackTrace(); } } if (this.initJobs != null) { LOG.info("Stopping initer"); this.initJobs.stopIniter(); try { this.initJobsThread.interrupt(); this.initJobsThread.join(); } catch (InterruptedException ex) { ex.printStackTrace(); } } if (this.expireLaunchingTaskThread != null) { LOG.info("Stopping expireLaunchingTasks"); this.expireLaunchingTasks.stop(); try { this.expireLaunchingTaskThread.interrupt(); this.expireLaunchingTaskThread.join(); } catch (InterruptedException ex) { ex.printStackTrace(); } } LOG.info("stopped all jobtracker services"); return; } /////////////////////////////////////////////////////// // Maintain lookup tables; called by JobInProgress // and TaskInProgress /////////////////////////////////////////////////////// void createTaskEntry(String taskid, String taskTracker, TaskInProgress tip) { LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'"); // taskid --> tracker
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -