⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapoutputlocation.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import java.io.IOException;import java.io.*;import java.net.*;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.io.*;import org.apache.hadoop.util.Progressable;/** The location of a map output file, as passed to a reduce task via the * {@link InterTrackerProtocol}. */ class MapOutputLocation implements Writable {    static {                                      // register a ctor      WritableFactories.setFactory        (MapOutputLocation.class,         new WritableFactory() {           public Writable newInstance() { return new MapOutputLocation(); }         });    }  private String mapTaskId;  private int mapId;  private String host;  private int port;  /** RPC constructor **/  public MapOutputLocation() {  }  /** Construct a location. */  public MapOutputLocation(String mapTaskId, int mapId,                            String host, int port) {    this.mapTaskId = mapTaskId;    this.mapId = mapId;    this.host = host;    this.port = port;  }  /** The map task id. */  public String getMapTaskId() { return mapTaskId; }    /**   * Get the map's id number.   * @return The numeric id for this map   */  public int getMapId() {    return mapId;  }  /** The host the task completed on. */  public String getHost() { return host; }  /** The port listening for {@link MapOutputProtocol} connections. */  public int getPort() { return port; }  public void write(DataOutput out) throws IOException {    UTF8.writeString(out, mapTaskId);    out.writeInt(mapId);    UTF8.writeString(out, host);    out.writeInt(port);  }  public void readFields(DataInput in) throws IOException {    this.mapTaskId = UTF8.readString(in);    this.mapId = in.readInt();    this.host = UTF8.readString(in);    this.port = in.readInt();  }  public String toString() {    return "http://" + host + ":" + port + "/mapOutput?map=" +             mapTaskId;  }    /**   * Get the map output into a local file from the remote server.   * We use the file system so that we generate checksum files on the data.   * @param fileSys the filesystem to write the file to   * @param localFilename the filename to write the data into   * @param reduce the reduce id to get for   * @param pingee a status object that wants to know when we make progress   * @throws IOException when something goes wrong   */  public long getFile(FileSystem fileSys,                       Path localFilename,                       int reduce,                      Progressable pingee) throws IOException {    boolean good = false;    long totalBytes = 0;    URL path = new URL(toString() + "&reduce=" + reduce);    try {      URLConnection connection = path.openConnection();      InputStream input = connection.getInputStream();      try {        OutputStream output = fileSys.create(localFilename);        try {          byte[] buffer = new byte[64 * 1024];          int len = input.read(buffer);          while (len > 0) {            totalBytes += len;            output.write(buffer, 0 ,len);            if (pingee != null) {              pingee.progress();            }            len = input.read(buffer);          }        } finally {          output.close();        }      } finally {        input.close();      }      good = ((int) totalBytes) == connection.getContentLength();      if (!good) {        throw new IOException("Incomplete map output received for " + path +                              " (" + totalBytes + " instead of " +                               connection.getContentLength() + ")");      }    } finally {      if (!good) {        try {          fileSys.delete(localFilename);          totalBytes = 0;        } catch (Throwable th) {          // IGNORED because we are cleaning up        }      }    }    return totalBytes;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -