📄 logalyzer.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.tools;import java.io.*;import java.util.Random;import org.apache.commons.logging.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.CopyFiles;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.lib.LongSumReducer;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.Writable;import java.util.regex.Pattern;import java.util.regex.Matcher;/** * Logalyzer: A utility tool for archiving and analyzing hadoop logs. * <p> * This tool supports archiving and anaylzing (sort/grep) of log-files. * It takes as input * a) Input uri which will serve uris of the logs to be archived. * b) Output directory (not mandatory). * b) Directory on dfs to archive the logs. * c) The sort/grep patterns for analyzing the files and separator for boundaries. * Usage: * Logalyzer -archive -archiveDir <directory to archive logs> -analysis <directory> -logs <log-list uri> -grep <pattern> -sort <col1, col2> -separator <separator> * <p> * * @author Arun C Murthy */public class Logalyzer { // Constants private static Configuration fsConfig = new Configuration(); /** A {@link Mapper} that extracts text matching a regular expression. */ public static class LogRegexMapper extends MapReduceBase implements Mapper { private Pattern pattern; public void configure(JobConf job) { pattern = Pattern.compile(job.get("mapred.mapper.regex")); } public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String text = ((Text)value).toString(); Matcher matcher = pattern.matcher(text); while (matcher.find()) { output.collect((Text)value, new LongWritable(1)); } } } /** A WritableComparator optimized for UTF8 keys of the logs. */ public static class LogComparator extends Text.Comparator implements Configurable { private static Log LOG = LogFactory.getLog("org.apache.hadoop.tools.Logalyzer"); private JobConf conf = null; private String[] sortSpec = null; private String columnSeparator = null; public void setConf(Configuration conf) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; } else { this.conf = new JobConf(conf); } //Initialize the specification for *comparision* String sortColumns = this.conf.get("mapred.reducer.sort", null); if(sortColumns != null) { sortSpec = sortColumns.split(","); } //Column-separator columnSeparator = this.conf.get("mapred.reducer.separator", ""); } public Configuration getConf() { return conf; } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { if(sortSpec == null) { return super.compare(b1, s1, l1, b2, s2, l2); } try { Text logline1 = new Text(); logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); String line1 = logline1.toString(); String[] logColumns1 = line1.split(columnSeparator); Text logline2 = new Text(); logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); String line2 = logline2.toString(); String[] logColumns2 = line2.split(columnSeparator); if(logColumns1 == null || logColumns2 == null) { return super.compare(b1, s1, l1, b2, s2, l2); } //Compare column-wise according to *sortSpec* for(int i=0; i < sortSpec.length; ++i) { int column = (Integer.valueOf(sortSpec[i]).intValue()); String c1 = logColumns1[column]; String c2 = logColumns2[column]; //Compare columns int comparision = super.compareBytes( c1.getBytes(), 0, c1.length(), c2.getBytes(), 0, c2.length() ); //They differ! if(comparision != 0) { return comparision; } } } catch (IOException ioe) { LOG.fatal("Caught " + ioe); return 0; } return 0; } static { // register this comparator WritableComparator.define(Text.class, new LogComparator()); } } /** * doArchive: Workhorse function to archive log-files. * @param logListURI : The uri which will serve list of log-files to archive. * @param archiveDirectory : The directory to store archived logfiles. * @throws IOException */ public void doArchive(String logListURI, String archiveDirectory) throws IOException { String destURL = new String("dfs://" + fsConfig.get("fs.default.name", "local") + archiveDirectory); CopyFiles.copy(fsConfig, logListURI, destURL, true, false); } /** * doAnalyze: * @param inputFilesDirectory : Directory containing the files to be analyzed. * @param outputDirectory : Directory to store analysis (output). * @param grepPattern : Pattern to *grep* for. * @param sortColumns : Sort specification for output. * @param columnSeparator : Column separator. * @throws IOException */ public void doAnalyze(String inputFilesDirectory, String outputDirectory, String grepPattern, String sortColumns, String columnSeparator) throws IOException { Path grepInput = new Path(inputFilesDirectory); Path analysisOutput = null; if(outputDirectory.equals("")) { analysisOutput = new Path(inputFilesDirectory, "logalyzer_" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); } else { analysisOutput = new Path(outputDirectory); } JobConf grepJob = new JobConf(fsConfig); grepJob.setJobName("logalyzer-grep-sort"); grepJob.setInputPath(grepInput); grepJob.setInputFormat(TextInputFormat.class); grepJob.setMapperClass(LogRegexMapper.class); grepJob.set("mapred.mapper.regex", grepPattern); grepJob.set("mapred.reducer.sort", sortColumns); grepJob.set("mapred.reducer.separator", columnSeparator); grepJob.setCombinerClass(LongSumReducer.class); grepJob.setReducerClass(LongSumReducer.class); grepJob.setOutputPath(analysisOutput); grepJob.setOutputFormat(TextOutputFormat.class); grepJob.setOutputKeyClass(Text.class); grepJob.setOutputValueClass(LongWritable.class); grepJob.setOutputKeyComparatorClass(LogComparator.class); grepJob.setNumReduceTasks(1); // write a single file JobClient.runJob(grepJob); } public static void main(String[] args) { Log LOG = LogFactory.getLog("org.apache.hadoop.tools.Logalyzer"); String version = "Logalyzer.0.0.1"; String usage = "Usage: Logalyzer [-archive -logs <urlsFile>] " + "-archiveDir <archiveDirectory> " + "-grep <pattern> -sort <column1,column2,...> -separator <separator> " + "-analysis <outputDirectory>"; System.out.println(version); if (args.length == 0) { System.err.println(usage); System.exit(-1); } //Command line arguments boolean archive = false; boolean grep = false; boolean sort = false; String archiveDir = ""; String logListURI = ""; String grepPattern = ".*"; String sortColumns = ""; String columnSeparator = " "; String outputDirectory = ""; for (int i = 0; i < args.length; i++) { // parse command line if (args[i].equals("-archive")) { archive = true; } else if (args[i].equals("-archiveDir")) { archiveDir = args[++i]; } else if (args[i].equals("-grep")) { grep = true; grepPattern = args[++i]; } else if (args[i].equals("-logs")) { logListURI = args[++i]; } else if (args[i].equals("-sort")) { sort = true; sortColumns = args[++i]; } else if (args[i].equals("-separator")) { columnSeparator = args[++i]; } else if (args[i].equals("-analysis")) { outputDirectory = args[++i]; } } LOG.info("analysisDir = " + outputDirectory); LOG.info("archiveDir = " + archiveDir); LOG.info("logListURI = " + logListURI); LOG.info("grepPattern = " + grepPattern); LOG.info("sortColumns = " + sortColumns); LOG.info("separator = " + columnSeparator); try { Logalyzer logalyzer = new Logalyzer(); // Archive? if (archive) { logalyzer.doArchive(logListURI, archiveDir); } // Analyze? if (grep || sort) { logalyzer.doAnalyze(archiveDir, outputDirectory, grepPattern, sortColumns, columnSeparator); } } catch (IOException ioe) { ioe.printStackTrace(); System.exit(-1); } } //main } //class Logalyzer
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -