⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamlinerecordreader.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.*;import java.nio.charset.MalformedInputException;import java.util.Arrays;import java.util.zip.GZIPInputStream;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.Text;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.util.StringUtils;/** * Similar to org.apache.hadoop.mapred.TextRecordReader,  * but delimits key and value with a TAB. * @author Michel Tourn */public class StreamLineRecordReader extends StreamBaseRecordReader {  public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,      JobConf job, FileSystem fs) throws IOException {    super(in, split, reporter, job, fs);    gzipped_ = StreamInputFormat.isGzippedInput(job);    if (gzipped_) {      din_ = new DataInputStream(new GZIPInputStream(in_));    } else {      din_ = in_;    }  }  public void seekNextRecordBoundary() throws IOException {    if (gzipped_) {      // no skipping: use din_ as-is       // assumes splitter created only one split per file      return;    } else {      int bytesSkipped = 0;      if (start_ != 0) {        in_.seek(start_ - 1);        // scan to the next newline in the file        while (in_.getPos() < end_) {          char c = (char) in_.read();          bytesSkipped++;          if (c == '\r' || c == '\n') {            break;          }        }      }      //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);    }  }  public synchronized boolean next(Writable key, Writable value) throws IOException {    if (!(key instanceof Text)) {      throw new IllegalArgumentException("Key should be of type Text but: "          + key.getClass().getName());    }    if (!(value instanceof Text)) {      throw new IllegalArgumentException("Value should be of type Text but: "          + value.getClass().getName());    }    Text tKey = (Text) key;    Text tValue = (Text) value;    byte[] line;    while (true) {      if (gzipped_) {        // figure EOS from readLine      } else {        long pos = in_.getPos();        if (pos >= end_) return false;      }      line = UTF8ByteArrayUtils.readLine((InputStream) in_);      try {        Text.validateUTF8(line);      } catch (MalformedInputException m) {        System.err.println("line=" + line + "|" + new Text(line));        System.out.flush();      }      if (line == null) return false;      try {        int tab = UTF8ByteArrayUtils.findTab(line);        if (tab == -1) {          tKey.set(line);          tValue.set("");        } else {          UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);        }        break;      } catch (MalformedInputException e) {        LOG.warn(StringUtils.stringifyException(e));      }    }    numRecStats(line, 0, line.length);    return true;  }  boolean gzipped_;  GZIPInputStream zin_;  DataInputStream din_; // GZIP or plain  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -