⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datanode.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.commons.logging.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.metrics.Metrics;import org.apache.hadoop.net.DNS;import org.apache.hadoop.util.*;import org.apache.hadoop.util.DiskChecker.DiskErrorException;import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;import org.apache.hadoop.mapred.StatusHttpServer;import java.io.*;import java.net.*;import java.util.*;import org.apache.hadoop.metrics.MetricsRecord;/********************************************************** * DataNode is a class (and program) that stores a set of * blocks for a DFS deployment.  A single deployment can * have one or many DataNodes.  Each DataNode communicates * regularly with a single NameNode.  It also communicates * with client code and other DataNodes from time to time. * * DataNodes store a series of named blocks.  The DataNode * allows client code to read these blocks, or to write new * block data.  The DataNode may also, in response to instructions * from its NameNode, delete blocks or copy blocks to/from other * DataNodes. * * The DataNode maintains just one critical table: *   block-> stream of bytes (of BLOCK_SIZE or less) * * This info is stored on a local disk.  The DataNode * reports the table's contents to the NameNode upon startup * and every so often afterwards. * * DataNodes spend their lives in an endless loop of asking * the NameNode for something to do.  A NameNode cannot connect * to a DataNode directly; a NameNode simply returns values from * functions invoked by a DataNode. * * DataNodes maintain an open server socket so that client code  * or other DataNodes can read/write data.  The host/port for * this server is reported to the NameNode, which then sends that * information to clients or other DataNodes that might be interested. * * @author Mike Cafarella **********************************************************/public class DataNode implements FSConstants, Runnable {    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");    //    // REMIND - mjc - I might bring "maxgigs" back so user can place     // artificial  limit on space    //private static final long GIGABYTE = 1024 * 1024 * 1024;    //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100);    //    /**     * Util method to build socket addr from string     */    public static InetSocketAddress createSocketAddr(String s) throws IOException {        String target = s;        int colonIndex = target.indexOf(':');        if (colonIndex < 0) {            throw new RuntimeException("Not a host:port pair: " + s);        }        String host = target.substring(0, colonIndex);        int port = Integer.parseInt(target.substring(colonIndex + 1));        return new InetSocketAddress(host, port);    }    DatanodeProtocol namenode;    FSDataset data;    DatanodeRegistration dnRegistration;    boolean shouldRun = true;    Vector receivedBlockList = new Vector();    int xmitsInProgress = 0;    Daemon dataXceiveServer = null;    long blockReportInterval;    private DataStorage storage = null;    private StatusHttpServer infoServer;    private static InetSocketAddress nameNodeAddr;    private static DataNode datanodeObject = null;    private class DataNodeMetrics {      private MetricsRecord metricsRecord = null;                  private long bytesWritten = 0L;      private long bytesRead = 0L;      private long blocksWritten = 0L;      private long blocksRead = 0L;      private long blocksReplicated = 0L;      private long blocksRemoved = 0L;            DataNodeMetrics() {        metricsRecord = Metrics.createRecord("dfs", "datanode");      }            synchronized void readBytes(int nbytes) {        bytesRead += nbytes;        Metrics.report(metricsRecord, "bytes-read", bytesRead);      }            synchronized void wroteBytes(int nbytes) {        bytesWritten += nbytes;        Metrics.report(metricsRecord, "bytes-written", bytesWritten);      }            synchronized void readBlocks(int nblocks) {        blocksRead += nblocks;        Metrics.report(metricsRecord, "blocks-read", blocksRead);      }            synchronized void wroteBlocks(int nblocks) {        blocksWritten += nblocks;        Metrics.report(metricsRecord, "blocks-written", blocksWritten);      }            synchronized void replicatedBlocks(int nblocks) {        blocksReplicated += nblocks;        Metrics.report(metricsRecord, "blocks-replicated", blocksReplicated);      }            synchronized void removedBlocks(int nblocks) {        blocksRemoved += nblocks;        Metrics.report(metricsRecord, "blocks-removed", blocksRemoved);      }    }        DataNodeMetrics myMetrics = new DataNodeMetrics();    /**     * Create the DataNode given a configuration and an array of dataDirs.     * 'dataDirs' is where the blocks are stored.     */    DataNode(Configuration conf, String[] dataDirs) throws IOException {        this(InetAddress.getLocalHost().getHostName(),              dataDirs,             createSocketAddr(conf.get("fs.default.name", "local")), conf);        // register datanode        register();        int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);        String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");        this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);        //create a servlet to serve full-file content        this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);        this.infoServer.start();        this.dnRegistration.infoPort = this.infoServer.getPort();        datanodeObject = this;    }        /**     * A DataNode can also be created with configuration information     * explicitly given.     *      * @see DataStorage     */    private DataNode(String machineName,                     String[] dataDirs,                     InetSocketAddress nameNodeAddr,                     Configuration conf ) throws IOException {      File[] volumes = new File[dataDirs.length];      for (int idx = 0; idx < dataDirs.length; idx++) {        volumes[idx] = new File(dataDirs[idx]);      }      // use configured nameserver & interface to get local hostname      machineName =        DNS.getDefaultHost        (conf.get("dfs.datanode.dns.interface","default"),         conf.get("dfs.datanode.dns.nameserver","default"));       // get storage info and lock the data dirs      storage = new DataStorage( volumes );      int numDirs = storage.getNumLocked();      if (numDirs == 0) { // all data dirs are in use        throw new IOException("Cannot start multiple Datanode instances "                              + "sharing the same data directories.\n"                              + StringUtils.arrayToString(dataDirs) + " are locked. ");      }      volumes = storage.getLockedDirs();      // connect to name node      this.namenode = (DatanodeProtocol)           RPC.waitForProxy(DatanodeProtocol.class,                           DatanodeProtocol.versionID,                           nameNodeAddr,                            conf);      // find free port      ServerSocket ss = null;      int tmpPort = conf.getInt("dfs.datanode.port", 50010);      String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");      while (ss == null) {        try {          ss = new ServerSocket(tmpPort,0,InetAddress.getByName(bindAddress));          LOG.info("Opened server at " + tmpPort);        } catch (IOException ie) {          LOG.info("Could not open server at " + tmpPort + ", trying new port");          tmpPort++;        }      }      // construct registration      this.dnRegistration = new DatanodeRegistration(                                        DFS_CURRENT_VERSION,                                         machineName + ":" + tmpPort,                                         storage.getStorageID(),                                        -1,                                        "" );      // initialize data node internal structure      this.data = new FSDataset(volumes, conf);      this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));      long blockReportIntervalBasis =        conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);      this.blockReportInterval =        blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));      this.nameNodeAddr = nameNodeAddr;    }    /** Return the DataNode object     *      */    public static DataNode getDataNode() {        return datanodeObject;    }     public InetSocketAddress getNameNodeAddr() {      return nameNodeAddr;    }        /**     * Return the namenode's identifier     */    public String getNamenode() {      //return namenode.toString();      return "<namenode>";    }    /**     * Register datanode     * <p>     * The datanode needs to register with the namenode on startup in order     * 1) to report which storage it is serving now and      * 2) to receive a registrationID      * issued by the namenode to recognize registered datanodes.     *      * @see FSNamesystem#registerDatanode(DatanodeRegistration)     * @throws IOException     */    private void register() throws IOException {      dnRegistration = namenode.register( dnRegistration );      if( storage.getStorageID().equals("") ) {        storage.setStorageID( dnRegistration.getStorageID());        storage.writeAll();      }    }    /**     * Shut down this instance of the datanode.     * Returns only after shutdown is complete.     */    public void shutdown() {        try {          infoServer.stop();        } catch (Exception e) {        }        this.shouldRun = false;        ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();        try {          this.storage.closeAll();        } catch (IOException ie) {        }    }    void handleDiskError( String errMsgr ) {        LOG.warn( "DataNode is shutting down.\n" + errMsgr );        try {            namenode.errorReport(                    dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);        } catch( IOException ignored) {                      }        shutdown();    }        private static class Count {        int value = 0;        Count(int init) { value = init; }        synchronized void incr() { value++; }        synchronized void decr() { value--; }        public String toString() { return Integer.toString(value); }        public int getValue() { return value; }    }        Count xceiverCount = new Count(0);        /**     * Main loop for the DataNode.  Runs until shutdown,     * forever calling remote NameNode functions.     */    public void offerService() throws Exception {           long lastHeartbeat = 0, lastBlockReport = 0;      LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");      //      // Now loop for a long time....      //      while (shouldRun) {        try {          long now = System.currentTimeMillis();          //          // Every so often, send heartbeat or block-report          //          if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {            //            // All heartbeat messages include following info:            // -- Datanode name            // -- data transfer port            // -- Total capacity            // -- Bytes remaining            //            BlockCommand cmd = namenode.sendHeartbeat(dnRegistration,                                                       data.getCapacity(),                                                       data.getRemaining(),                                                       xmitsInProgress,                                                      xceiverCount.getValue());            //LOG.info("Just sent heartbeat, with name " + localName);            lastHeartbeat = now;            if( cmd != null ) {              data.checkDataDir();              if (cmd.transferBlocks()) {                //                // Send a copy of a block to another datanode                //                Block blocks[] = cmd.getBlocks();                DatanodeInfo xferTargets[][] = cmd.getTargets();                                    for (int i = 0; i < blocks.length; i++) {                  if (!data.isValidBlock(blocks[i])) {                    String errStr = "Can't send invalid block " + blocks[i];                    LOG.info(errStr);                    namenode.errorReport( dnRegistration,                                 DatanodeProtocol.INVALID_BLOCK,                                 errStr);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -