📄 jobhistory.java
字号:
package org.apache.hadoop.mapred;import java.io.BufferedReader;import java.io.File;import java.io.FileFilter;import java.io.FileOutputStream;import java.io.FileReader;import java.io.IOException;import java.io.PrintWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.TreeMap;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * Provides methods for writing to and reading from job history. * Job History works in an append mode, JobHistory and its inner classes provide methods * to log job events. * * JobHistory is split into multiple files, format of each file is plain text where each line * is of the format [type (key=value)*], where type identifies the type of the record. * Type maps to UID of one of the inner classes of this class. * * Job history is maintained in a master index which contains star/stop times of all jobs with * a few other job level properties. Apart from this each job's history is maintained in a seperate history * file. name of job history files follows the format jobtrackerId_jobid * * For parsing the job history it supports a listener based interface where each line is parsed * and passed to listener. The listener can create an object model of history or look for specific * events and discard rest of the history. * */public class JobHistory { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobHistory"); private static final String DELIMITER = " "; private static final String KEY = "(\\w+)"; private static final String VALUE = "[[^\"]?]+" ; // anything but a " in "" private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\""); public static final String JOBTRACKER_START_TIME = String.valueOf(System.currentTimeMillis()); private static final String LOG_DIR = System.getProperty("hadoop.log.dir") + File.separator + "history" ; public static final String MASTER_INDEX_LOG_FILE = "JobHistory.log"; private static PrintWriter masterIndex = null; private static Map<String, PrintWriter> openJobs = new HashMap<String, PrintWriter>(); private static boolean disableHistory = false; /** * Record types are identifiers for each line of log in history files. * A record type appears as the first token in a single line of log. */ public static enum RecordTypes {Jobtracker, Job, Task, MapAttempt, ReduceAttempt}; /** * Job history files contain key="value" pairs, where keys belong to this enum. * It acts as a global namespace for all keys. */ public static enum Keys { JOBTRACKERID, START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF,SUBMIT_TIME, LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED }; /** * This enum contains some of the values commonly used by history log events. * since values in history can only be strings - Values.name() is used in * most places in history file. */ public static enum Values { SUCCESS, FAILED, KILLED, MAP, REDUCE }; // temp buffer for parsed dataa private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>(); // init log files static { init() ; } /** * Initialize JobHistory files. * */ private static void init(){ if( !disableHistory ){ try{ File logDir = new File(LOG_DIR); if( ! logDir.exists() ){ logDir.mkdirs(); } masterIndex = new PrintWriter( new FileOutputStream(new File( LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE), true )) ; // add jobtracker id = tracker start time log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, JOBTRACKER_START_TIME); }catch(IOException e){ LOG.error("Failed to initialize JobHistory log file", e); disableHistory = true ; } } } /** * Parses history file and invokes Listener.handle() for each line of history. It can * be used for looking through history files for specific items without having to keep * whlole history in memory. * @param path path to history file * @param l Listener for history events * @throws IOException */ public static void parseHistory(File path, Listener l) throws IOException{ BufferedReader reader = new BufferedReader(new FileReader(path)); String line = null ; StringBuffer buf = new StringBuffer(); while ((line = reader.readLine())!= null){ buf.append(line); if( ! line.trim().endsWith("\"")){ continue ; } parseLine(buf.toString(), l ); buf = new StringBuffer(); } } /** * Parse a single line of history. * @param line * @param l * @throws IOException */ private static void parseLine(String line, Listener l)throws IOException{ // extract the record type int idx = line.indexOf(' '); String recType = line.substring(0, idx) ; String data = line.substring(idx+1, line.length()) ; Matcher matcher = pattern.matcher(data); while(matcher.find()){ String tuple = matcher.group(0); String []parts = tuple.split("="); parseBuffer.put(Keys.valueOf(parts[0]), parts[1].substring(1, parts[1].length() -1)); } l.handle(RecordTypes.valueOf(recType), parseBuffer); parseBuffer.clear(); } /** * Log a raw record type with keys and values. This is method is generally not used directly. * @param recordType type of log event * @param key key * @param value value */ static void log(PrintWriter out, RecordTypes recordType, Enum key, String value){ out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""); out.flush(); } /** * Log a number of keys and values with record. the array length of keys and values * should be same. * @param recordType type of log event * @param keys type of log event * @param values type of log event */ static void log(PrintWriter out, RecordTypes recordType, Enum[] keys, String[] values){ StringBuffer buf = new StringBuffer(recordType.name()) ; buf.append(DELIMITER) ; for( int i =0 ; i< keys.length ; i++ ){ buf.append(keys[i]); buf.append("=\""); buf.append(values[i]); buf.append("\""); buf.append(DELIMITER); } out.println(buf.toString()); out.flush(); } /** * Returns history disable status. by default history is enabled so this * method returns false. * @return true if history logging is disabled, false otherwise. */ public static boolean isDisableHistory() { return disableHistory; } /** * Enable/disable history logging. Default value is false, so history * is enabled by default. * @param disableHistory true if history should be disabled, false otherwise. */ public static void setDisableHistory(boolean disableHistory) { JobHistory.disableHistory = disableHistory; } /** * Base class contais utility stuff to manage types key value pairs with enums. */ static class KeyValuePair{ private Map<Keys, String> values = new HashMap<Keys, String>(); /** * Get 'String' value for given key. Most of the places use Strings as * values so the default get' method returns 'String'. This method never returns * null to ease on GUIs. if no value is found it returns empty string "" * @param k * @return if null it returns empty string - "" */ public String get(Keys k){ String s = values.get(k); return s == null ? "" : s ; } /** * Convert value from history to int and return. * if no value is found it returns 0. * @param k key * @return */ public int getInt(Keys k){ String s = values.get(k); if( null != s ){ return Integer.parseInt(s); } return 0; } /** * Convert value from history to int and return. * if no value is found it returns 0. * @param k * @return */ public long getLong(Keys k){ String s = values.get(k); if( null != s ){ return Long.parseLong(s); } return 0; } /** * Set value for the key. * @param k * @param s */ public void set(Keys k, String s){ values.put(k, s); } /** * Adds all values in the Map argument to its own values. * @param m */ public void set(Map<Keys, String> m){ values.putAll(m); } /** * Reads values back from the history, input is same Map as passed to Listener by parseHistory(). * @param values */ public void handle(Map<Keys, String> values){ set(values); } /** * Returns Map containing all key-values. * @return */ public Map<Keys, String> getValues(){ return values; } } /** * Helper class for logging or reading back events related to job start, finish or failure. */ public static class JobInfo extends KeyValuePair{ private Map<String, Task> allTasks = new TreeMap<String, Task>(); /** Create new JobInfo */ public JobInfo(String jobId){ set(Keys.JOBID, jobId); } /** * Returns all map and reduce tasks <taskid-Task>. * @return */ public Map<String, Task> getAllTasks() { return allTasks; } /** * Log job submitted event to history. Creates a new file in history * for the job. if history file creation fails, it disables history * for all other events. * @param jobId job id assigned by job tracker. * @param jobName job name as given by user in job conf * @param user user name * @param submitTime time when job tracker received the job * @param jobConf path to job conf xml file in HDFS. */ public static void logSubmitted(String jobId, String jobName, String user, long submitTime, String jobConf){ if( ! disableHistory ){ synchronized(MASTER_INDEX_LOG_FILE){ JobHistory.log(masterIndex, RecordTypes.Job, new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF }, new String[]{jobId, jobName, user, String.valueOf(submitTime),jobConf }); } // setup the history log file for this job String logFileName = JOBTRACKER_START_TIME + "_" + jobId ; File logFile = new File(LOG_DIR + File.separator + logFileName); try{ PrintWriter writer = new PrintWriter(logFile); openJobs.put(logFileName, writer); // add to writer as well JobHistory.log(writer, RecordTypes.Job, new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF }, new String[]{jobId, jobName, user, String.valueOf(submitTime) ,jobConf}); }catch(IOException e){ LOG.error("Failed creating job history log file, disabling history", e); disableHistory = true ; } } } /** * Logs launch time of job. * @param jobId job id, assigned by jobtracker. * @param startTime start time of job. * @param totalMaps total maps assigned by jobtracker. * @param totalReduces total reduces. */ public static void logStarted(String jobId, long startTime, int totalMaps, int totalReduces){ if( ! disableHistory ){ synchronized(MASTER_INDEX_LOG_FILE){ JobHistory.log(masterIndex, RecordTypes.Job, new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES }, new String[] {jobId, String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces) } ) ; } String logFileName = JOBTRACKER_START_TIME + "_" + jobId ; PrintWriter writer = (PrintWriter)openJobs.get(logFileName); if( null != writer ){ JobHistory.log(writer, RecordTypes.Job, new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME,Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES }, new String[] {jobId, String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)} ) ; } } } /** * Log job finished. closes the job file in history. * @param jobId job id, assigned by jobtracker. * @param finishTime finish time of job in ms. * @param finishedMaps no of maps successfully finished. * @param finishedReduces no of reduces finished sucessfully. * @param failedMaps no of failed map tasks. * @param failedReduces no of failed reduce tasks. */ public static void logFinished(String jobId, long finishTime, int finishedMaps, int finishedReduces, int failedMaps, int failedReduces){ if( ! disableHistory ){ synchronized(MASTER_INDEX_LOG_FILE){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -