⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobtracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                        job = (JobInProgress) jobInitQueue.elementAt(0);                        jobInitQueue.remove(job);                    } else {                        try {                            jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);                        } catch (InterruptedException iex) {                        }                    }                }                try {                    if (job != null) {                        job.initTasks();                    }                } catch (Exception e) {                    LOG.warn("job init failed", e);                    job.kill();                }            }        }        public void stopIniter() {            shouldRun = false;        }    }    static class JobTrackerMetrics {      private MetricsRecord metricsRecord = null;            private long numMapTasksLaunched = 0L;      private long numMapTasksCompleted = 0L;      private long numReduceTasksLaunched = 0L;      private long numReduceTasksCompleted = 0L;      private long numJobsSubmitted = 0L;      private long numJobsCompleted = 0L;            JobTrackerMetrics() {        metricsRecord = Metrics.createRecord("mapred", "jobtracker");      }            synchronized void launchMap() {        Metrics.report(metricsRecord, "maps-launched",            ++numMapTasksLaunched);      }            synchronized void completeMap() {        Metrics.report(metricsRecord, "maps-completed",            ++numMapTasksCompleted);      }            synchronized void launchReduce() {        Metrics.report(metricsRecord, "reduces-launched",            ++numReduceTasksLaunched);      }            synchronized void completeReduce() {        Metrics.report(metricsRecord, "reduces-completed",            ++numReduceTasksCompleted);      }            synchronized void submitJob() {        Metrics.report(metricsRecord, "jobs-submitted",            ++numJobsSubmitted);      }            synchronized void completeJob() {        Metrics.report(metricsRecord, "jobs-completed",            ++numJobsCompleted);      }    }    private JobTrackerMetrics myMetrics = null;        /////////////////////////////////////////////////////////////////    // The real JobTracker    ////////////////////////////////////////////////////////////////    int port;    String localMachine;    long startTime;    int totalSubmissions = 0;    Random r = new Random();    private int maxCurrentTasks;    //    // Properties to maintain while running Jobs and Tasks:    //    // 1.  Each Task is always contained in a single Job.  A Job succeeds when all its     //     Tasks are complete.    //    // 2.  Every running or successful Task is assigned to a Tracker.  Idle Tasks are not.    //    // 3.  When a Tracker fails, all of its assigned Tasks are marked as failures.    //    // 4.  A Task might need to be reexecuted if it (or the machine it's hosted on) fails    //     before the Job is 100% complete.  Sometimes an upstream Task can fail without    //     reexecution if all downstream Tasks that require its output have already obtained    //     the necessary files.    //    // All the known jobs.  (jobid->JobInProgress)    TreeMap jobs = new TreeMap();    Vector jobsByArrival = new Vector();    // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)    Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();    // (taskid --> trackerID)     TreeMap taskidToTrackerMap = new TreeMap();    // (trackerID->TreeSet of taskids running at that tracker)    TreeMap trackerToTaskMap = new TreeMap();    //    // Watch and expire TaskTracker objects using these structures.    // We can map from Name->TaskTrackerStatus, or we can expire by time.    //    int totalMaps = 0;    int totalReduces = 0;    private TreeMap taskTrackers = new TreeMap();    Vector jobInitQueue = new Vector();    ExpireTrackers expireTrackers = new ExpireTrackers();    Thread expireTrackersThread = null;    RetireJobs retireJobs = new RetireJobs();    Thread retireJobsThread = null;    JobInitThread initJobs = new JobInitThread();    Thread initJobsThread = null;    ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();    Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);        /**     * It might seem like a bug to maintain a TreeSet of status objects,     * which can be updated at any time.  But that's not what happens!  We     * only update status objects in the taskTrackers table.  Status objects     * are never updated once they enter the expiry queue.  Instead, we wait     * for them to expire and remove them from the expiry queue.  If a status     * object has been updated in the taskTracker table, the latest status is      * reinserted.  Otherwise, we assume the tracker has expired.     */    TreeSet trackerExpiryQueue = new TreeSet(new Comparator() {        public int compare(Object o1, Object o2) {            TaskTrackerStatus p1 = (TaskTrackerStatus) o1;            TaskTrackerStatus p2 = (TaskTrackerStatus) o2;            if (p1.getLastSeen() < p2.getLastSeen()) {                return -1;            } else if (p1.getLastSeen() > p2.getLastSeen()) {                return 1;            } else {                return (p1.getTrackerName().compareTo(p2.getTrackerName()));            }        }    });    // Used to provide an HTML view on Job, Task, and TaskTracker structures    StatusHttpServer infoServer;    String infoBindAddress;    int infoPort;    Server interTrackerServer;    // Some jobs are stored in a local system directory.  We can delete    // the files when we're done with the job.    static final String SUBDIR = "jobTracker";    FileSystem fs;    Path systemDir;    private Configuration conf;    /**     * Start the JobTracker process, listen on the indicated port     */    JobTracker(Configuration conf) throws IOException {        //        // Grab some static constants        //        maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);        RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);        RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);        TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);        PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",                                      0.01f);        // This is a directory of temporary submission files.  We delete it        // on startup, and can delete any files that we're done with        this.conf = conf;        JobConf jobConf = new JobConf(conf);        this.systemDir = jobConf.getSystemDir();        this.fs = FileSystem.get(conf);        fs.delete(systemDir);        fs.mkdirs(systemDir);        // Same with 'localDir' except it's always on the local disk.        jobConf.deleteLocalFiles(SUBDIR);        // Set ports, start RPC servers, etc.        InetSocketAddress addr = getAddress(conf);        this.localMachine = addr.getHostName();        this.port = addr.getPort();        this.interTrackerServer = RPC.getServer(this,addr.getHostName(), addr.getPort(), 10, false, conf);        this.interTrackerServer.start();        Properties p = System.getProperties();        for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {          String key = (String) it.next();          String val = (String) p.getProperty(key);          LOG.info("Property '" + key + "' is " + val);        }        this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);        this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");        this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);        this.infoServer.start();        this.startTime = System.currentTimeMillis();        myMetrics = new JobTrackerMetrics();        this.expireTrackersThread = new Thread(this.expireTrackers);        this.expireTrackersThread.start();        this.retireJobsThread = new Thread(this.retireJobs);        this.retireJobsThread.start();        this.initJobsThread = new Thread(this.initJobs);        this.initJobsThread.start();        expireLaunchingTaskThread.start();    }    public static InetSocketAddress getAddress(Configuration conf) {      String jobTrackerStr =        conf.get("mapred.job.tracker", "localhost:8012");      int colon = jobTrackerStr.indexOf(":");      if (colon < 0) {        throw new RuntimeException("Bad mapred.job.tracker: "+jobTrackerStr);      }      String jobTrackerName = jobTrackerStr.substring(0, colon);      int jobTrackerPort = Integer.parseInt(jobTrackerStr.substring(colon+1));      return new InetSocketAddress(jobTrackerName, jobTrackerPort);    }    /**     * Run forever     */    public void offerService() {        try {            this.interTrackerServer.join();        } catch (InterruptedException ie) {        }        LOG.info("Stopped interTrackerServer");    }    void close() throws IOException {        if (this.infoServer != null) {            LOG.info("Stopping infoServer");            try {                this.infoServer.stop();            } catch (InterruptedException ex) {                ex.printStackTrace();            }        }        if (this.interTrackerServer != null) {            LOG.info("Stopping interTrackerServer");            this.interTrackerServer.stop();        }        if (this.expireTrackers != null) {            LOG.info("Stopping expireTrackers");            this.expireTrackers.stopTracker();            try {                this.expireTrackersThread.interrupt();                this.expireTrackersThread.join();            } catch (InterruptedException ex) {                ex.printStackTrace();            }        }        if (this.retireJobs != null) {            LOG.info("Stopping retirer");            this.retireJobs.stopRetirer();            try {                this.retireJobsThread.interrupt();                this.retireJobsThread.join();            } catch (InterruptedException ex) {                ex.printStackTrace();            }        }        if (this.initJobs != null) {            LOG.info("Stopping initer");            this.initJobs.stopIniter();            try {                this.initJobsThread.interrupt();                this.initJobsThread.join();            } catch (InterruptedException ex) {                ex.printStackTrace();            }        }        if (this.expireLaunchingTaskThread != null) {            LOG.info("Stopping expireLaunchingTasks");            this.expireLaunchingTasks.stop();            try {                this.expireLaunchingTaskThread.interrupt();                this.expireLaunchingTaskThread.join();            } catch (InterruptedException ex) {                ex.printStackTrace();            }        }        LOG.info("stopped all jobtracker services");        return;    }        ///////////////////////////////////////////////////////    // Maintain lookup tables; called by JobInProgress    // and TaskInProgress    ///////////////////////////////////////////////////////    void createTaskEntry(String taskid, String taskTracker, TaskInProgress tip) {        LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");        // taskid --> tracker

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -