⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobhistory.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.apache.hadoop.mapred;import java.io.BufferedReader;import java.io.File;import java.io.FileFilter;import java.io.FileOutputStream;import java.io.FileReader;import java.io.IOException;import java.io.PrintWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.TreeMap;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * Provides methods for writing to and reading from job history.  * Job History works in an append mode, JobHistory and its inner classes provide methods  * to log job events.  *  * JobHistory is split into multiple files, format of each file is plain text where each line  * is of the format [type (key=value)*], where type identifies the type of the record.  * Type maps to UID of one of the inner classes of this class.  *  * Job history is maintained in a master index which contains star/stop times of all jobs with * a few other job level properties. Apart from this each job's history is maintained in a seperate history  * file. name of job history files follows the format jobtrackerId_jobid *   * For parsing the job history it supports a listener based interface where each line is parsed * and passed to listener. The listener can create an object model of history or look for specific  * events and discard rest of the history.   * */public class JobHistory {    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobHistory");  private static final String DELIMITER = " ";  private static final String KEY = "(\\w+)";  private static final String VALUE = "[[^\"]?]+" ; // anything but a " in ""    private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");    public static final String JOBTRACKER_START_TIME = String.valueOf(System.currentTimeMillis());   private static final String LOG_DIR = System.getProperty("hadoop.log.dir") + File.separator + "history" ;   public static final String MASTER_INDEX_LOG_FILE = "JobHistory.log";     private static PrintWriter masterIndex = null;  private static Map<String, PrintWriter> openJobs = new HashMap<String, PrintWriter>();   private static boolean disableHistory = false;   /**   * Record types are identifiers for each line of log in history files.    * A record type appears as the first token in a single line of log.    */  public static enum RecordTypes {Jobtracker, Job, Task, MapAttempt, ReduceAttempt};  /**   * Job history files contain key="value" pairs, where keys belong to this enum.    * It acts as a global namespace for all keys.    */  public static enum Keys { JOBTRACKERID,    START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF,SUBMIT_TIME, LAUNCH_TIME,     TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS, FINISHED_REDUCES,    JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS,     COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED   };  /**   * This enum contains some of the values commonly used by history log events.    * since values in history can only be strings - Values.name() is used in    * most places in history file.    */  public static enum Values {    SUCCESS, FAILED, KILLED, MAP, REDUCE  };  // temp buffer for parsed dataa  private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>();   // init log files  static { init() ; }     /**   * Initialize JobHistory files.    *   */  private static void init(){    if( !disableHistory ){      try{        File logDir = new File(LOG_DIR);         if( ! logDir.exists() ){          logDir.mkdirs();         }        masterIndex =           new PrintWriter(              new FileOutputStream(new File( LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE), true )) ;        // add jobtracker id = tracker start time        log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, JOBTRACKER_START_TIME);        }catch(IOException e){        LOG.error("Failed to initialize JobHistory log file", e);         disableHistory = true ;       }    }  }  /**   * Parses history file and invokes Listener.handle() for each line of history. It can    * be used for looking through history files for specific items without having to keep    * whlole history in memory.    * @param path path to history file   * @param l Listener for history events    * @throws IOException   */  public static void parseHistory(File path, Listener l) throws IOException{      BufferedReader reader = new BufferedReader(new FileReader(path));      String line = null ;       StringBuffer buf = new StringBuffer();       while ((line = reader.readLine())!= null){        buf.append(line);         if( ! line.trim().endsWith("\"")){          continue ;         }        parseLine(buf.toString(), l );        buf = new StringBuffer();       }  }  /**   * Parse a single line of history.    * @param line   * @param l   * @throws IOException   */  private static void parseLine(String line, Listener l)throws IOException{    // extract the record type     int idx = line.indexOf(' ');     String recType = line.substring(0, idx) ;    String data = line.substring(idx+1, line.length()) ;        Matcher matcher = pattern.matcher(data);     while(matcher.find()){      String tuple = matcher.group(0);      String []parts = tuple.split("=");            parseBuffer.put(Keys.valueOf(parts[0]), parts[1].substring(1, parts[1].length() -1));    }    l.handle(RecordTypes.valueOf(recType), parseBuffer);         parseBuffer.clear();   }      /**   * Log a raw record type with keys and values. This is method is generally not used directly.    * @param recordType type of log event   * @param key key   * @param value value   */    static void log(PrintWriter out, RecordTypes recordType, Enum key, String value){    out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\"");     out.flush();  }    /**   * Log a number of keys and values with record. the array length of keys and values   * should be same.    * @param recordType type of log event   * @param keys type of log event   * @param values type of log event   */  static void log(PrintWriter out, RecordTypes recordType, Enum[] keys, String[] values){    StringBuffer buf = new StringBuffer(recordType.name()) ;     buf.append(DELIMITER) ;     for( int i =0 ; i< keys.length ; i++ ){      buf.append(keys[i]);      buf.append("=\"");      buf.append(values[i]);      buf.append("\"");      buf.append(DELIMITER);     }        out.println(buf.toString());    out.flush();   }    /**   * Returns history disable status. by default history is enabled so this   * method returns false.    * @return true if history logging is disabled, false otherwise.    */  public static boolean isDisableHistory() {    return disableHistory;  }  /**   * Enable/disable history logging. Default value is false, so history    * is enabled by default.    * @param disableHistory true if history should be disabled, false otherwise.    */  public static void setDisableHistory(boolean disableHistory) {    JobHistory.disableHistory = disableHistory;  }    /**   * Base class contais utility stuff to manage types key value pairs with enums.    */  static class KeyValuePair{    private Map<Keys, String> values = new HashMap<Keys, String>();     /**     * Get 'String' value for given key. Most of the places use Strings as      * values so the default get' method returns 'String'.  This method never returns      * null to ease on GUIs. if no value is found it returns empty string ""     * @param k      * @return if null it returns empty string - ""      */    public String get(Keys k){      String s = values.get(k);       return s == null ? "" : s ;     }    /**     * Convert value from history to int and return.      * if no value is found it returns 0.     * @param k key      * @return     */    public int getInt(Keys k){      String s = values.get(k);       if( null != s ){        return Integer.parseInt(s);      }      return 0;     }    /**     * Convert value from history to int and return.      * if no value is found it returns 0.     * @param k     * @return     */    public long getLong(Keys k){      String s = values.get(k);       if( null != s ){        return Long.parseLong(s);      }      return 0;     }    /**     * Set value for the key.      * @param k     * @param s     */    public void set(Keys k, String s){      values.put(k, s);     }    /**     * Adds all values in the Map argument to its own values.      * @param m     */    public void set(Map<Keys, String> m){      values.putAll(m);    }    /**     * Reads values back from the history, input is same Map as passed to Listener by parseHistory().       * @param values     */    public void handle(Map<Keys, String> values){      set(values);     }    /**     * Returns Map containing all key-values.      * @return     */    public Map<Keys, String> getValues(){      return values;     }  }    /**   * Helper class for logging or reading back events related to job start, finish or failure.    */  public static class JobInfo extends KeyValuePair{        private Map<String, Task> allTasks = new TreeMap<String, Task>();        /** Create new JobInfo */    public JobInfo(String jobId){       set(Keys.JOBID, jobId);      }    /**     * Returns all map and reduce tasks <taskid-Task>.      * @return     */    public Map<String, Task> getAllTasks() { return allTasks; }        /**     * Log job submitted event to history. Creates a new file in history      * for the job. if history file creation fails, it disables history      * for all other events.      * @param jobId job id assigned by job tracker.      * @param jobName job name as given by user in job conf     * @param user user name     * @param submitTime time when job tracker received the job     * @param jobConf path to job conf xml file in HDFS.      */    public static void logSubmitted(String jobId, String jobName, String user,         long submitTime, String jobConf){            if( ! disableHistory ){        synchronized(MASTER_INDEX_LOG_FILE){          JobHistory.log(masterIndex, RecordTypes.Job,               new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },               new String[]{jobId, jobName, user, String.valueOf(submitTime),jobConf });        }        // setup the history log file for this job        String logFileName =  JOBTRACKER_START_TIME + "_" + jobId ;         File logFile = new File(LOG_DIR + File.separator + logFileName);                try{          PrintWriter writer = new PrintWriter(logFile);          openJobs.put(logFileName, writer);          // add to writer as well           JobHistory.log(writer, RecordTypes.Job,               new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },               new String[]{jobId, jobName, user, String.valueOf(submitTime) ,jobConf});                      }catch(IOException e){          LOG.error("Failed creating job history log file, disabling history", e);          disableHistory = true ;         }      }    }    /**     * Logs launch time of job.      * @param jobId job id, assigned by jobtracker.      * @param startTime start time of job.      * @param totalMaps total maps assigned by jobtracker.      * @param totalReduces total reduces.      */    public static void logStarted(String jobId, long startTime, int totalMaps, int totalReduces){      if( ! disableHistory ){        synchronized(MASTER_INDEX_LOG_FILE){          JobHistory.log(masterIndex, RecordTypes.Job,               new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },              new String[] {jobId,  String.valueOf(startTime),                 String.valueOf(totalMaps), String.valueOf(totalReduces) } ) ;         }                String logFileName =  JOBTRACKER_START_TIME + "_" + jobId ;         PrintWriter writer = (PrintWriter)openJobs.get(logFileName);                 if( null != writer ){          JobHistory.log(writer, RecordTypes.Job,               new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME,Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },              new String[] {jobId,  String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)} ) ;         }      }    }    /**     * Log job finished. closes the job file in history.      * @param jobId job id, assigned by jobtracker.      * @param finishTime finish time of job in ms.      * @param finishedMaps no of maps successfully finished.      * @param finishedReduces no of reduces finished sucessfully.      * @param failedMaps no of failed map tasks.      * @param failedReduces no of failed reduce tasks.      */     public static void logFinished(String jobId, long finishTime, int finishedMaps, int finishedReduces,        int failedMaps, int failedReduces){      if( ! disableHistory ){        synchronized(MASTER_INDEX_LOG_FILE){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -