⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 localjobrunner.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import java.io.*;import java.util.*;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;/** Implements MapReduce locally, in-process, for debugging. */ class LocalJobRunner implements JobSubmissionProtocol {  public static final Log LOG =    LogFactory.getLog("org.apache.hadoop.mapred.LocalJobRunner");  private FileSystem fs;  private HashMap jobs = new HashMap();  private Configuration conf;  private int map_tasks = 0;  private int reduce_tasks = 0;  private JobTrackerMetrics myMetrics = null;  public long getProtocolVersion(String protocol, long clientVersion) {    return JobSubmissionProtocol.versionID;  }    private class Job extends Thread    implements TaskUmbilicalProtocol {    private String file;    private String id;    private JobConf job;    private Random random = new Random();    private JobStatus status;    private ArrayList mapIds = new ArrayList();    private MapOutputFile mapoutputFile;    private JobProfile profile;    private Path localFile;    private FileSystem localFs;    public long getProtocolVersion(String protocol, long clientVersion) {      return TaskUmbilicalProtocol.versionID;    }        public Job(String file, Configuration conf) throws IOException {      this.file = file;      this.id = "job_" + newId();      this.mapoutputFile = new MapOutputFile();      this.mapoutputFile.setConf(conf);      this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");      this.localFs = FileSystem.getNamed("local", conf);      fs.copyToLocalFile(new Path(file), localFile);      this.job = new JobConf(localFile);      profile = new JobProfile(job.getUser(), id, file,                                "http://localhost:8080/", job.getJobName());      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);      jobs.put(id, this);      this.start();    }    JobProfile getProfile() {      return profile;    }        public void run() {      try {        // split input into minimum number of splits        FileSplit[] splits;        splits = job.getInputFormat().getSplits(fs, job, 1);        String jobId = profile.getJobId();                // run a map task for each split        job.setNumReduceTasks(1);                 // force a single reduce task        for (int i = 0; i < splits.length; i++) {          mapIds.add("map_" + newId());          MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i,                                    splits[i]);          JobConf localConf = new JobConf(job);          map.localizeConfiguration(localConf);          map.setConf(localConf);          map_tasks += 1;          myMetrics.launchMap();          map.run(localConf, this);          myMetrics.completeMap();          map_tasks -= 1;        }        // move map output to reduce input        String reduceId = "reduce_" + newId();        for (int i = 0; i < mapIds.size(); i++) {          String mapId = (String)mapIds.get(i);          Path mapOut = this.mapoutputFile.getOutputFile(mapId, 0);          Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);          localFs.mkdirs(reduceIn.getParent());          if (!localFs.rename(mapOut, reduceIn))            throw new IOException("Couldn't rename " + mapOut);          this.mapoutputFile.removeAll(mapId);        }        {          ReduceTask reduce = new ReduceTask(jobId, file,                                              reduceId, 0, mapIds.size());          JobConf localConf = new JobConf(job);          reduce.localizeConfiguration(localConf);          reduce.setConf(localConf);          reduce_tasks += 1;          myMetrics.launchReduce();          reduce.run(localConf, this);          myMetrics.completeReduce();          reduce_tasks -= 1;        }        this.mapoutputFile.removeAll(reduceId);                this.status.setRunState(JobStatus.SUCCEEDED);      } catch (Throwable t) {        this.status.setRunState(JobStatus.FAILED);        LOG.warn(id, t);      } finally {        try {          fs.delete(new Path(file).getParent());  // delete submit dir          localFs.delete(localFile);              // delete local copy        } catch (IOException e) {          LOG.warn("Error cleaning up "+id+": "+e);        }      }    }    private String newId() {      return Integer.toString(Math.abs(random.nextInt()),36);    }    // TaskUmbilicalProtocol methods    public Task getTask(String taskid) { return null; }    public void progress(String taskId, float progress, String state, Phase phase) {      LOG.info(state);      float taskIndex = mapIds.indexOf(taskId);      if (taskIndex >= 0) {                       // mapping        float numTasks = mapIds.size();        status.setMapProgress(taskIndex/numTasks + progress/numTasks);      } else {        status.setReduceProgress(progress);      }            // ignore phase    }    public void reportDiagnosticInfo(String taskid, String trace) {      // Ignore for now    }    public boolean ping(String taskid) throws IOException {      return true;    }    public void done(String taskId) throws IOException {      int taskIndex = mapIds.indexOf(taskId);      if (taskIndex >= 0) {                       // mapping        status.setMapProgress(1.0f);      } else {        status.setReduceProgress(1.0f);      }    }    public synchronized void fsError(String message) throws IOException {      LOG.fatal("FSError: "+ message);    }  }  public LocalJobRunner(Configuration conf) throws IOException {    this.fs = FileSystem.get(conf);    this.conf = conf;    myMetrics = new JobTrackerMetrics();  }  // JobSubmissionProtocol methods  public JobStatus submitJob(String jobFile) throws IOException {    return new Job(jobFile, this.conf).status;  }  public void killJob(String id) {    ((Thread)jobs.get(id)).stop();  }  public JobProfile getJobProfile(String id) {    Job job = (Job)jobs.get(id);    return job.getProfile();  }  public TaskReport[] getMapTaskReports(String id) {    return new TaskReport[0];  }  public TaskReport[] getReduceTaskReports(String id) {    return new TaskReport[0];  }  public JobStatus getJobStatus(String id) {    Job job = (Job)jobs.get(id);    return job.status;  }  public String getFilesystemName() throws IOException {    return fs.getName();  }    public ClusterStatus getClusterStatus() {    return new ClusterStatus(1, map_tasks, reduce_tasks, 1);  }  public JobStatus[] jobsToComplete() {return null;}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -