⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network.java

📁 分布式全文搜索工具包 可以支持集群 主要使用java開發 比較方便使用
💻 JAVA
字号:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.contrib.dlucene.network;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.Collection;import java.util.HashSet;import java.util.List;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.contrib.dlucene.DataNodeStatus;import org.apache.hadoop.contrib.dlucene.Utils;import org.apache.hadoop.net.NetUtils;import org.apache.hadoop.net.Node;import org.apache.hadoop.net.NodeBase;import org.apache.hadoop.util.StringUtils;/** * Class that extends NetworkTopology */public class Network extends NetworkTopology {  /**   * Constructor   */  public Network() {    super();  }  static Node toNode(DataNodeStatus status) {    return new NodeBase(toLocation(status));  }  static String toLocation(DataNodeStatus status) {    String name = Network.convertInetSocketAddress(status.getAddress());    return status.getRack() + NodeBase.PATH_SEPARATOR_STR + name;  }  public void add(DataNodeStatus status) {    super.add(toNode(status));  }  public void remove(DataNodeStatus status) {    super.remove(get(status));  }  public boolean isOnSameRack(DataNodeStatus status1, DataNodeStatus status2) {    return super.isOnSameRack(get(status1), get(status2));  }  public Node get(DataNodeStatus status) {    return super.getNode(toLocation(status));  }  public boolean contains(DataNodeStatus status) {    return super.contains(get(status));  }  /**   * Get a node at random   *    * @param scope the node should be within this scope   * @param excludedScope the node should not be within this scope   * @return   */  private Collection<InetSocketAddress> getRandomNode(String scope,      String excludedScope) {    List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();    if (excludedScope != null) {      if (scope.startsWith(excludedScope)) {        return null;      }      if (!excludedScope.startsWith(scope)) {        excludedScope = null;      }    }    Node scopeNode = getNode(scope);    if (!(scopeNode instanceof InnerNode)) {      result.add(NetUtils.createSocketAddr(scopeNode.getName()));      return result;    }    InnerNode innerNode = (InnerNode) scopeNode;    int numOfDatanodes = innerNode.getNumOfLeaves();    Node excludedScopeNode = excludedScope != null ? getNode(excludedScope) : null;    if (numOfDatanodes > 0) {      Set<String> seen = new HashSet<String>();      int leafIndex = r.nextInt(numOfDatanodes);      for (int i = 0; i <= numOfDatanodes ; i++) {        Node resultNode = innerNode.getLeaf((leafIndex + i) % numOfDatanodes,            excludedScopeNode);        if (resultNode != null) {          if (!seen.contains(resultNode.getName())) {            LOG.debug("Using index " + i + " and getting back " + resultNode.getName());            result.add(NetUtils.createSocketAddr(resultNode.getName()));            seen.add(resultNode.getName());          }        }      }    }    return result;  }  /**   * Get a node at random   *    * @param scope The scope the node must be within   * @return   */  public Collection<InetSocketAddress> getRandomNode(String scope) {    netlock.readLock().lock();    try {      if (scope.startsWith("~")) {        return getRandomNode(NodeBase.ROOT, scope.substring(1));      }      return getRandomNode(scope, null);    } finally {      netlock.readLock().unlock();    }  }  /**   * Convert an InetSocketAddress to a name   *    * @param addr   * @return   */  public static String convertInetSocketAddress(InetSocketAddress addr) {    Utils.checkArgs(addr);    return addr.getHostName() + ":" + addr.getPort();  }  // FIXME - copied from DataNode.java  /* Get the network location by running a script configured in conf */  public static String getNetworkLoc(Configuration conf) throws IOException {    String locScript = conf.get("dfs.network.script");    if (locScript == null)      return NetworkTopology.DEFAULT_RACK;    LOG.info("Starting to run script to get datanode network location");    Process p = Runtime.getRuntime().exec(locScript);    StringBuffer networkLoc = new StringBuffer();    final BufferedReader inR = new BufferedReader(new InputStreamReader(p        .getInputStream()));    final BufferedReader errR = new BufferedReader(new InputStreamReader(p        .getErrorStream()));    // read & log any error messages from the running script    Thread errThread = new Thread() {      @Override      public void start() {        try {          String errLine = errR.readLine();          while (errLine != null) {            LOG.warn("Network script error: " + errLine);            errLine = errR.readLine();          }        } catch (IOException e) {          //        }      }    };    try {      errThread.start();      // fetch output from the process      String line = inR.readLine();      while (line != null) {        networkLoc.append(line);        line = inR.readLine();      }      try {        // wait for the process to finish        int returnVal = p.waitFor();        // check the exit code        if (returnVal != 0) {          throw new IOException("Process exits with nonzero status: "              + locScript);        }      } catch (InterruptedException e) {        throw new IOException(e.getMessage());      } finally {        try {          // make sure that the error thread exits          errThread.join();        } catch (InterruptedException je) {          LOG.warn(StringUtils.stringifyException(je));        }      }    } finally {      // close in & error streams      try {        inR.close();      } catch (IOException ine) {        throw ine;      } finally {        errR.close();      }    }    return networkLoc.toString();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -