⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 minimrcluster.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import java.io.*;import java.util.*;/** * This class creates a single-process Map-Reduce cluster for junit testing. * One thread is created for each server. * @author Milind Bhandarkar */public class MiniMRCluster {        private Thread jobTrackerThread;    private JobTrackerRunner jobTracker;        private int jobTrackerPort = 0;    private int taskTrackerPort = 0;        private int numTaskTrackers;        private List taskTrackerList = new ArrayList();    private List taskTrackerThreadList = new ArrayList();        private String namenode;        /**     * An inner class that runs a job tracker.     */    class JobTrackerRunner implements Runnable {        /**         * Create the job tracker and run it.         */        public void run() {            try {                JobConf jc = new JobConf();                jc.set("fs.name.node", namenode);                jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);                // this timeout seems to control the minimum time for the test, so                // set it down at 2 seconds.                jc.setInt("ipc.client.timeout", 1000);                jc.set("mapred.local.dir","build/test/mapred/local");                JobTracker.startTracker(jc);            } catch (Throwable e) {                System.err.println("Job tracker crashed:");                e.printStackTrace();            }        }                /**         * Shutdown the job tracker and wait for it to finish.         */        public void shutdown() {            try {                JobTracker.stopTracker();            } catch (Throwable e) {                System.err.println("Unable to shut down job tracker:");                e.printStackTrace();            }        }    }        /**     * An inner class to run the task tracker.     */    class TaskTrackerRunner implements Runnable {        TaskTracker tt;        String localDir;        boolean isInitialized = false;        boolean isDead = false;                /**         * Create and run the task tracker.         */        public void run() {            try {                JobConf jc = new JobConf();                jc.set("fs.name.node", namenode);                jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);                // this timeout seems to control the minimum time for the test, so                // set it down at 2 seconds.                jc.setInt("ipc.client.timeout", 1000);                jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);                jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);                File localDir = new File(jc.get("mapred.local.dir"));                File ttDir = new File(localDir, Integer.toString(taskTrackerPort));                ttDir.mkdirs();                this.localDir = ttDir.getAbsolutePath();                jc.set("mapred.local.dir", ttDir.getAbsolutePath());                tt = new TaskTracker(jc);                isInitialized = true;                tt.run();            } catch (Throwable e) {                isDead = true;                tt = null;                System.err.println("Task tracker crashed:");                e.printStackTrace();            }        }                /**         * Get the local dir for this TaskTracker.         * @return the absolute pathname         */        public String getLocalDir() {          return localDir;        }                /**         * Shut down the server and wait for it to finish.         */        public void shutdown() {            if (tt != null) {                try {                    tt.shutdown();                } catch (Throwable e) {                    System.err.println("Unable to shut down task tracker:");                    e.printStackTrace();                }            }        }    }        /**     * Get the local directory for the Nth task tracker     * @param taskTracker the index of the task tracker to check     * @return the absolute pathname of the local dir     */    public String getTaskTrackerLocalDir(int taskTracker) {      return ((TaskTrackerRunner)               taskTrackerList.get(taskTracker)).getLocalDir();    }    /**     * Get the number of task trackers in the cluster     */    public int getNumTaskTrackers() {      return taskTrackerList.size();    }        /**     * Wait until the system is idle.     */    public void waitUntilIdle() {      for(Iterator itr= taskTrackerList.iterator(); itr.hasNext(); ) {        TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();        while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {          if (!runner.isInitialized) {            System.out.println("Waiting for task tracker to start.");          } else {            System.out.println("Waiting for task tracker " + runner.tt.getName() +                               " to be idle.");          }          try {            Thread.sleep(1000);          } catch (InterruptedException ie) {}        }      }    }        /**     * Create the config and start up the servers.     */    public MiniMRCluster(int jobTrackerPort,            int taskTrackerPort,            int numTaskTrackers,            String namenode,            boolean taskTrackerFirst) throws IOException {        this.jobTrackerPort = jobTrackerPort;        this.taskTrackerPort = taskTrackerPort;        this.numTaskTrackers = numTaskTrackers;        this.namenode = namenode;                File configDir = new File("build", "minimr");        configDir.mkdirs();        File siteFile = new File(configDir, "hadoop-site.xml");        PrintWriter pw = new PrintWriter(siteFile);        pw.print("<?xml version=\"1.0\"?>\n"+                "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+                "<configuration>\n"+                " <property>\n"+                "   <name>mapred.system.dir</name>\n"+                "   <value>build/test/mapred/system</value>\n"+                " </property>\n"+                "</configuration>\n");        pw.close();        jobTracker = new JobTrackerRunner();        jobTrackerThread = new Thread(jobTracker);        if (!taskTrackerFirst) {          jobTrackerThread.start();        }        for (int idx = 0; idx < numTaskTrackers; idx++) {            TaskTrackerRunner taskTracker = new TaskTrackerRunner();            Thread taskTrackerThread = new Thread(taskTracker);            taskTrackerThread.start();            taskTrackerList.add(taskTracker);            taskTrackerThreadList.add(taskTrackerThread);        }        if (taskTrackerFirst) {          jobTrackerThread.start();        }        waitUntilIdle();    }        /**     * Shut down the servers.     */    public void shutdown() {      try {        waitUntilIdle();        for (int idx = 0; idx < numTaskTrackers; idx++) {            TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx);            Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);            taskTracker.shutdown();            taskTrackerThread.interrupt();            try {                taskTrackerThread.join();            } catch (InterruptedException ex) {                ex.printStackTrace();            }        }        jobTracker.shutdown();        jobTrackerThread.interrupt();        try {            jobTrackerThread.join();        } catch (InterruptedException ex) {            ex.printStackTrace();        }        } finally {            File configDir = new File("build", "minimr");            File siteFile = new File(configDir, "hadoop-site.xml");            siteFile.delete();        }    }        public static void main(String[] args) throws IOException {        System.out.println("Bringing up Jobtracker and tasktrackers.");        MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local", false);        System.out.println("JobTracker and TaskTrackers are up.");        mr.shutdown();        System.out.println("JobTracker and TaskTrackers brought down.");    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -