📄 jobhistory.java
字号:
JobHistory.log(masterIndex, RecordTypes.Job, new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES }, new String[] {jobId, "" + finishTime, Values.SUCCESS.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces) } ) ; } // close job file for this job String logFileName = JOBTRACKER_START_TIME + "_" + jobId ; PrintWriter writer = openJobs.get(logFileName); if( null != writer){ JobHistory.log(writer, RecordTypes.Job, new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES, Keys.FAILED_MAPS, Keys.FAILED_REDUCES}, new String[] {jobId, "" + finishTime, Values.SUCCESS.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces), String.valueOf(failedMaps), String.valueOf(failedReduces)} ) ; writer.close(); openJobs.remove(logFileName); } Thread historyCleaner = new Thread( new HistoryCleaner() ); historyCleaner.start(); } } /** * Logs job failed event. Closes the job history log file. * @param jobid job id * @param timestamp time when job failure was detected in ms. * @param finishedMaps no finished map tasks. * @param finishedReduces no of finished reduce tasks. */ public static void logFailed(String jobid, long timestamp, int finishedMaps,int finishedReduces){ if( ! disableHistory ){ synchronized(MASTER_INDEX_LOG_FILE){ JobHistory.log(masterIndex, RecordTypes.Job, new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES }, new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces)} ) ; } String logFileName = JOBTRACKER_START_TIME + "_" + jobid ; PrintWriter writer = (PrintWriter)openJobs.get(logFileName); if( null != writer){ JobHistory.log(writer, RecordTypes.Job, new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS,Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES }, new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces)} ) ; writer.close(); openJobs.remove(logFileName); } } } } /** * Helper class for logging or reading back events related to Task's start, finish or failure. * All events logged by this class are logged in a separate file per job in * job tracker history. These events map to TIPs in jobtracker. */ public static class Task extends KeyValuePair{ private Map <String, TaskAttempt> taskAttempts = new TreeMap<String, TaskAttempt>(); /** * Log start time of task (TIP). * @param jobId job id * @param taskId task id * @param taskType MAP or REDUCE * @param startTime startTime of tip. */ public static void logStarted(String jobId, String taskId, String taskType, long startTime){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME}, new String[]{taskId, taskType, String.valueOf(startTime)}) ; } } } /** * Log finish time of task. * @param jobId job id * @param taskId task id * @param taskType MAP or REDUCE * @param finishTime finish timeof task in ms */ public static void logFinished(String jobId, String taskId, String taskType, long finishTime){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE, Keys.TASK_STATUS, Keys.FINISH_TIME}, new String[]{ taskId,taskType, Values.SUCCESS.name(), String.valueOf(finishTime)}) ; } } } /** * Log job failed event. * @param jobId jobid * @param taskId task id * @param taskType MAP or REDUCE. * @param time timestamp when job failed detected. * @param error error message for failure. */ public static void logFailed(String jobId, String taskId, String taskType, long time, String error){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR}, new String[]{ taskId, taskType, Values.FAILED.name(), String.valueOf(time) , error}) ; } } } /** * Returns all task attempts for this task. <task attempt id - TaskAttempt> * @return */ public Map<String, TaskAttempt> getTaskAttempts(){ return this.taskAttempts; } } /** * Base class for Map and Reduce TaskAttempts. */ public static class TaskAttempt extends Task{} /** * Helper class for logging or reading back events related to start, finish or failure of * a Map Attempt on a node. */ public static class MapAttempt extends TaskAttempt{ /** * Log start time of this map task attempt. * @param jobId job id * @param taskId task id * @param taskAttemptId task attempt id * @param startTime start time of task attempt as reported by task tracker. * @param hostName host name of the task attempt. */ public static void logStarted(String jobId, String taskId,String taskAttemptId, long startTime, String hostName){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log( writer, RecordTypes.MapAttempt, new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME}, new String[]{Values.MAP.name(), taskId, taskAttemptId, String.valueOf(startTime), hostName} ) ; } } } /** * Log finish time of map task attempt. * @param jobId job id * @param taskId task id * @param taskAttemptId task attempt id * @param finishTime finish time * @param hostName host name */ public static void logFinished(String jobId, String taskId, String taskAttemptId, long finishTime, String hostName){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log(writer, RecordTypes.MapAttempt, new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME}, new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(), String.valueOf(finishTime), hostName} ) ; } } } /** * Log task attempt failed event. * @param jobId jobid * @param taskId taskid * @param taskAttemptId task attempt id * @param timestamp timestamp * @param hostName hostname of this task attempt. * @param error error message if any for this task attempt. */ public static void logFailed(String jobId, String taskId, String taskAttemptId, long timestamp, String hostName, String error){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log( writer, RecordTypes.MapAttempt, new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR}, new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(), String.valueOf(timestamp), hostName, error} ) ; } } } } /** * Helper class for logging or reading back events related to start, finish or failure of * a Map Attempt on a node. */ public static class ReduceAttempt extends TaskAttempt{ /** * Log start time of Reduce task attempt. * @param jobId job id * @param taskId task id (tip) * @param taskAttemptId task attempt id * @param startTime start time * @param hostName host name */ public static void logStarted(String jobId, String taskId, String taskAttemptId, long startTime, String hostName){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log( writer, RecordTypes.ReduceAttempt, new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME}, new String[]{Values.REDUCE.name(), taskId, taskAttemptId, String.valueOf(startTime), hostName} ) ; } } } /** * Log finished event of this task. * @param jobId job id * @param taskId task id * @param taskAttemptId task attempt id * @param shuffleFinished shuffle finish time * @param sortFinished sort finish time * @param finishTime finish time of task * @param hostName host name where task attempt executed */ public static void logFinished(String jobId, String taskId, String taskAttemptId, long shuffleFinished, long sortFinished, long finishTime, String hostName){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log( writer, RecordTypes.ReduceAttempt, new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME}, new String[]{Values.REDUCE.name(), taskId, taskAttemptId, Values.SUCCESS.name(), String.valueOf(shuffleFinished), String.valueOf(sortFinished), String.valueOf(finishTime), hostName} ) ; } } } /** * Log failed reduce task attempt. * @param jobId job id * @param taskId task id * @param taskAttemptId task attempt id * @param timestamp time stamp when task failed * @param hostName host name of the task attempt. * @param error error message of the task. */ public static void logFailed(String jobId, String taskId,String taskAttemptId, long timestamp, String hostName, String error){ if( ! disableHistory ){ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); if( null != writer ){ JobHistory.log( writer, RecordTypes.ReduceAttempt, new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR }, new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(), String.valueOf(timestamp), hostName, error } ) ; } } } } /** * Callback interface for reading back log events from JobHistory. This interface * should be implemented and passed to JobHistory.parseHistory() * */ public static interface Listener{ /** * Callback method for history parser. * @param recType type of record, which is the first entry in the line. * @param values a map of key-value pairs as thry appear in history. * @throws IOException */ public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException; } /** * Delete history files older than one month. Update master index and remove all * jobs older than one month. Also if a job tracker has no jobs in last one month * remove reference to the job tracker. * @author sanjaydahiya * */ public static class HistoryCleaner implements Runnable{ static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L; static final long THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS; private long now ; private static boolean isRunning = false; private static long lastRan ; /** * Cleans up history data. */ public void run(){ if( isRunning ){ return ; } now = System.currentTimeMillis() ; // clean history only once a day at max if( lastRan ==0 || (now - lastRan) < ONE_DAY_IN_MS ){ return ; } lastRan = now; isRunning = true ; // update master Index first try{ File logFile = new File( LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE); synchronized(MASTER_INDEX_LOG_FILE){ Map<String, Map<String, JobHistory.JobInfo>> jobTrackersToJobs = DefaultJobHistoryParser.parseMasterIndex(logFile); // find job that started more than one month back and remove them // for jobtracker instances which dont have a job in past one month // remove the jobtracker start timestamp as well. for (String jobTrackerId : jobTrackersToJobs.keySet()){ Map<String, JobHistory.JobInfo> jobs = jobTrackersToJobs.get(jobTrackerId); for(Iterator iter = jobs.keySet().iterator(); iter.hasNext() ; iter.next()){ JobHistory.JobInfo job = jobs.get(iter.next()); if( now - job.getLong(Keys.SUBMIT_TIME) > THIRTY_DAYS_IN_MS ) { iter.remove(); } if( jobs.size() == 0 ){ iter.remove(); } } } masterIndex.close(); masterIndex = new PrintWriter(logFile); // delete old history and write back to a new file for (String jobTrackerId : jobTrackersToJobs.keySet()){ Map<String, JobHistory.JobInfo> jobs = jobTrackersToJobs.get(jobTrackerId); log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, jobTrackerId); for(String jobId : jobs.keySet() ){ JobHistory.JobInfo job = jobs.get(jobId); Map<Keys, String> values = job.getValues(); log(masterIndex, RecordTypes.Job, values.keySet().toArray(new Keys[0]), values.values().toArray(new String[0])); } masterIndex.flush(); } } }catch(IOException e){ LOG.error("Failed loading history log for cleanup", e); } File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){ public boolean accept(File file){ // delete if older than 30 days if( now - file.lastModified() > THIRTY_DAYS_IN_MS ){ return true ; } return false; } }); for( File f : oldFiles){ f.delete(); LOG.info("Deleting old history file : " + f.getName()); } isRunning = false ; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -