⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipemapper.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.*;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.Writable;/** A generic Mapper bridge. *  It delegates operations to an external program via stdin and stdout. *  @author Michel Tourn */public class PipeMapper extends PipeMapRed implements Mapper {  String getPipeCommand(JobConf job) {    return job.get("stream.map.streamprocessor");  }  String getKeyColPropName() {    return "mapKeyCols";  }  boolean getUseSideEffect() {    return StreamUtil.getUseMapSideEffect(job_);  }  boolean getDoPipe() {    return true;  }  // Do NOT declare default constructor  // (MapRed creates it reflectively)  public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {    // init    if (outThread_ == null) {      startOutputThreads(output, reporter);    }    try {      // 1/4 Hadoop in      numRecRead_++;      maybeLogRecord();      if (debugFailDuring_ && numRecRead_ == 3) {        throw new IOException("debugFailDuring_");      }      // 2/4 Hadoop to Tool      if (numExceptions_ == 0) {        if (optUseKey_) {          write(key);          clientOut_.write('\t');        }        write(value);        clientOut_.write('\n');        clientOut_.flush();      } else {        numRecSkipped_++;      }    } catch (IOException io) {      numExceptions_++;      if (numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {        // terminate with failure        String msg = logFailure(io);        appendLogToJobLog("failure");        mapRedFinished();        throw new IOException(msg);      } else {        // terminate with success:        // swallow input records although the stream processor failed/closed      }    }  }  public void close() {    appendLogToJobLog("success");    mapRedFinished();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -