⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobhistory.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
          JobHistory.log(masterIndex, RecordTypes.Job,                        new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },              new String[] {jobId,  "" + finishTime, Values.SUCCESS.name(),                 String.valueOf(finishedMaps), String.valueOf(finishedReduces) } ) ;        }                // close job file for this job        String logFileName = JOBTRACKER_START_TIME + "_" + jobId ;         PrintWriter writer = openJobs.get(logFileName);         if( null != writer){          JobHistory.log(writer, RecordTypes.Job,                        new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES,              Keys.FAILED_MAPS, Keys.FAILED_REDUCES},              new String[] {jobId,  "" + finishTime, Values.SUCCESS.name(),                 String.valueOf(finishedMaps), String.valueOf(finishedReduces),                String.valueOf(failedMaps), String.valueOf(failedReduces)} ) ;          writer.close();          openJobs.remove(logFileName);         }        Thread historyCleaner  = new Thread( new HistoryCleaner() );        historyCleaner.start();       }    }    /**     * Logs job failed event. Closes the job history log file.      * @param jobid job id     * @param timestamp time when job failure was detected in ms.       * @param finishedMaps no finished map tasks.      * @param finishedReduces no of finished reduce tasks.      */    public static void logFailed(String jobid, long timestamp, int finishedMaps,int finishedReduces){      if( ! disableHistory ){        synchronized(MASTER_INDEX_LOG_FILE){          JobHistory.log(masterIndex, RecordTypes.Job,              new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },              new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),                 String.valueOf(finishedReduces)} ) ;         }          String logFileName =  JOBTRACKER_START_TIME + "_" + jobid ;           PrintWriter writer = (PrintWriter)openJobs.get(logFileName);           if( null != writer){            JobHistory.log(writer, RecordTypes.Job,                new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS,Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },                new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),                   String.valueOf(finishedReduces)} ) ;             writer.close();            openJobs.remove(logFileName);           }      }    }  }  /**   * Helper class for logging or reading back events related to Task's start, finish or failure.    * All events logged by this class are logged in a separate file per job in    * job tracker history. These events map to TIPs in jobtracker.    */  public static class Task extends KeyValuePair{    private Map <String, TaskAttempt> taskAttempts = new TreeMap<String, TaskAttempt>();     /**     * Log start time of task (TIP).     * @param jobId job id     * @param taskId task id     * @param taskType MAP or REDUCE     * @param startTime startTime of tip.      */    public static void logStarted(String jobId, String taskId, String taskType,          long startTime){      if( ! disableHistory ){        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);         if( null != writer ){          JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},               new String[]{taskId, taskType, String.valueOf(startTime)}) ;        }      }    }    /**     * Log finish time of task.      * @param jobId job id     * @param taskId task id     * @param taskType MAP or REDUCE     * @param finishTime finish timeof task in ms     */    public static void logFinished(String jobId, String taskId, String taskType,         long finishTime){      if( ! disableHistory ){        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);         if( null != writer ){          JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,               Keys.TASK_STATUS, Keys.FINISH_TIME},               new String[]{ taskId,taskType, Values.SUCCESS.name(), String.valueOf(finishTime)}) ;        }      }    }    /**     * Log job failed event.     * @param jobId jobid     * @param taskId task id     * @param taskType MAP or REDUCE.     * @param time timestamp when job failed detected.      * @param error error message for failure.      */    public static void logFailed(String jobId, String taskId, String taskType, long time, String error){      if( ! disableHistory ){        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);         if( null != writer ){          JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,               Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},               new String[]{ taskId,  taskType, Values.FAILED.name(), String.valueOf(time) , error}) ;        }      }    }    /**     * Returns all task attempts for this task. <task attempt id - TaskAttempt>     * @return     */    public Map<String, TaskAttempt> getTaskAttempts(){      return this.taskAttempts;    }  }  /**   * Base class for Map and Reduce TaskAttempts.    */  public static class TaskAttempt extends Task{}   /**   * Helper class for logging or reading back events related to start, finish or failure of    * a Map Attempt on a node.   */  public static class MapAttempt extends TaskAttempt{   /**    * Log start time of this map task attempt.     * @param jobId job id    * @param taskId task id    * @param taskAttemptId task attempt id    * @param startTime start time of task attempt as reported by task tracker.     * @param hostName host name of the task attempt.     */   public static void logStarted(String jobId, String taskId,String taskAttemptId, long startTime, String hostName){     if( ! disableHistory ){       PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);       if( null != writer ){         JobHistory.log( writer, RecordTypes.MapAttempt,              new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,                Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},             new String[]{Values.MAP.name(),  taskId,                 taskAttemptId, String.valueOf(startTime), hostName} ) ;        }      }    }   /**    * Log finish time of map task attempt.     * @param jobId job id    * @param taskId task id    * @param taskAttemptId task attempt id     * @param finishTime finish time    * @param hostName host name     */    public static void logFinished(String jobId, String taskId, String taskAttemptId, long finishTime, String hostName){      if( ! disableHistory ){        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);        if( null != writer ){          JobHistory.log(writer, RecordTypes.MapAttempt,               new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,               Keys.FINISH_TIME, Keys.HOSTNAME},              new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),                String.valueOf(finishTime), hostName} ) ;         }      }    }    /**     * Log task attempt failed event.       * @param jobId jobid     * @param taskId taskid     * @param taskAttemptId task attempt id     * @param timestamp timestamp     * @param hostName hostname of this task attempt.     * @param error error message if any for this task attempt.      */    public static void logFailed(String jobId, String taskId, String taskAttemptId,         long timestamp, String hostName, String error){      if( ! disableHistory ){        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);        if( null != writer ){          JobHistory.log( writer, RecordTypes.MapAttempt,               new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,                 Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},              new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),                String.valueOf(timestamp), hostName, error} ) ;         }      }    }   }  /**   * Helper class for logging or reading back events related to start, finish or failure of    * a Map Attempt on a node.   */  public static class ReduceAttempt extends TaskAttempt{    /**     * Log start time of  Reduce task attempt.      * @param jobId job id     * @param taskId task id (tip)     * @param taskAttemptId task attempt id     * @param startTime start time     * @param hostName host name      */    public static void logStarted(String jobId, String taskId, String taskAttemptId,         long startTime, String hostName){      if( ! disableHistory ){        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);        if( null != writer ){          JobHistory.log( writer, RecordTypes.ReduceAttempt,               new Enum[]{  Keys.TASK_TYPE, Keys.TASKID,                 Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},              new String[]{Values.REDUCE.name(),  taskId,                 taskAttemptId, String.valueOf(startTime), hostName} ) ;         }      }     }    /**     * Log finished event of this task.      * @param jobId job id     * @param taskId task id     * @param taskAttemptId task attempt id     * @param shuffleFinished shuffle finish time     * @param sortFinished sort finish time     * @param finishTime finish time of task     * @param hostName host name where task attempt executed     */     public static void logFinished(String jobId, String taskId, String taskAttemptId,         long shuffleFinished, long sortFinished, long finishTime, String hostName){       if( ! disableHistory ){         PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);         if( null != writer ){           JobHistory.log( writer, RecordTypes.ReduceAttempt,                new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,                Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME},               new String[]{Values.REDUCE.name(),  taskId, taskAttemptId, Values.SUCCESS.name(),                String.valueOf(shuffleFinished), String.valueOf(sortFinished),               String.valueOf(finishTime), hostName} ) ;          }       }     }     /**      * Log failed reduce task attempt.       * @param jobId job id       * @param taskId task id      * @param taskAttemptId task attempt id      * @param timestamp time stamp when task failed      * @param hostName host name of the task attempt.        * @param error error message of the task.       */     public static void logFailed(String jobId, String taskId,String taskAttemptId, long timestamp,           String hostName, String error){       if( ! disableHistory ){         PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);         if( null != writer ){           JobHistory.log( writer, RecordTypes.ReduceAttempt,                new Enum[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,Keys.TASK_STATUS,                  Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },               new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(),                String.valueOf(timestamp), hostName, error } ) ;          }       }     }  }  /**   * Callback interface for reading back log events from JobHistory. This interface    * should be implemented and passed to JobHistory.parseHistory()    *   */  public static interface Listener{    /**     * Callback method for history parser.      * @param recType type of record, which is the first entry in the line.      * @param values a map of key-value pairs as thry appear in history.     * @throws IOException     */    public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException;   }    /**   * Delete history files older than one month. Update master index and remove all    * jobs older than one month. Also if a job tracker has no jobs in last one month   * remove reference to the job tracker.    * @author sanjaydahiya   *   */  public static class HistoryCleaner implements Runnable{    static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;    static final long THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS;    private long now ;     private static boolean isRunning = false;     private static long lastRan ;     /**     * Cleans up history data.      */    public void run(){      if( isRunning ){        return ;       }      now = System.currentTimeMillis() ;      // clean history only once a day at max      if( lastRan ==0 || (now - lastRan) < ONE_DAY_IN_MS ){        return ;       }       lastRan = now;         isRunning = true ;         // update master Index first        try{        File logFile = new File(            LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE);                 synchronized(MASTER_INDEX_LOG_FILE){          Map<String, Map<String, JobHistory.JobInfo>> jobTrackersToJobs =             DefaultJobHistoryParser.parseMasterIndex(logFile);                    // find job that started more than one month back and remove them          // for jobtracker instances which dont have a job in past one month           // remove the jobtracker start timestamp as well.           for (String jobTrackerId : jobTrackersToJobs.keySet()){            Map<String, JobHistory.JobInfo> jobs = jobTrackersToJobs.get(jobTrackerId);            for(Iterator iter = jobs.keySet().iterator(); iter.hasNext() ; iter.next()){              JobHistory.JobInfo job = jobs.get(iter.next());              if( now - job.getLong(Keys.SUBMIT_TIME) > THIRTY_DAYS_IN_MS ) {                iter.remove();               }              if( jobs.size() == 0 ){                iter.remove();               }            }          }          masterIndex.close();           masterIndex = new PrintWriter(logFile);          // delete old history and write back to a new file           for (String jobTrackerId : jobTrackersToJobs.keySet()){            Map<String, JobHistory.JobInfo> jobs = jobTrackersToJobs.get(jobTrackerId);                        log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, jobTrackerId);            for(String jobId : jobs.keySet() ){              JobHistory.JobInfo job = jobs.get(jobId);              Map<Keys, String> values = job.getValues();                            log(masterIndex, RecordTypes.Job,                   values.keySet().toArray(new Keys[0]),                   values.values().toArray(new String[0]));             }            masterIndex.flush();          }        }      }catch(IOException e){        LOG.error("Failed loading history log for cleanup", e);      }            File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){        public boolean accept(File file){          // delete if older than 30 days          if( now - file.lastModified() > THIRTY_DAYS_IN_MS ){            return true ;           }            return false;         }      });      for( File f : oldFiles){        f.delete();         LOG.info("Deleting old history file : " + f.getName());      }      isRunning = false ;     }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -