datanode.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 788 行 · 第 1/3 页

JAVA
788
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/********************************************************** * DataNode is a class (and program) that stores a set of * blocks for a DFS deployment.  A single deployment can * have one or many DataNodes.  Each DataNode communicates * regularly with a single NameNode.  It also communicates * with client code and other DataNodes from time to time. * * DataNodes store a series of named blocks.  The DataNode * allows client code to read these blocks, or to write new * block data.  The DataNode may also, in response to instructions * from its NameNode, delete blocks or copy blocks to/from other * DataNodes. * * The DataNode maintains just one critical table: *   block-> stream of bytes (of BLOCK_SIZE or less) * * This info is stored on a local disk.  The DataNode * reports the table's contents to the NameNode upon startup * and every so often afterwards. * * DataNodes spend their lives in an endless loop of asking * the NameNode for something to do.  A NameNode cannot connect * to a DataNode directly; a NameNode simply returns values from * functions invoked by a DataNode. * * DataNodes maintain an open server socket so that client code  * or other DataNodes can read/write data.  The host/port for * this server is reported to the NameNode, which then sends that * information to clients or other DataNodes that might be interested. * * @author Mike Cafarella **********************************************************/public class DataNode implements FSConstants, Runnable {    public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.DataNode");  //    // REMIND - mjc - I might bring "maxgigs" back so user can place     // artificial  limit on space    //private static final long GIGABYTE = 1024 * 1024 * 1024;    //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100);    //    /**     * Util method to build socket addr from string     */    public static InetSocketAddress createSocketAddr(String s) throws IOException {        String target = s;        int colonIndex = target.indexOf(':');        if (colonIndex < 0) {            throw new RuntimeException("Not a host:port pair: " + s);        }        String host = target.substring(0, colonIndex);        int port = Integer.parseInt(target.substring(colonIndex + 1));        return new InetSocketAddress(host, port);    }    private static Vector subThreadList = null;    DatanodeProtocol namenode;    FSDataset data;    String localName;    boolean shouldRun = true;    Vector receivedBlockList = new Vector();    int xmitsInProgress = 0;    Daemon dataXceiveServer = null;    long blockReportInterval;    private long datanodeStartupPeriod;    private Configuration fConf;    /**     * Create the DataNode given a configuration and a dataDir.     * 'dataDir' is where the blocks are stored.     */    public DataNode(Configuration conf, String datadir) throws IOException {        this(InetAddress.getLocalHost().getHostName(),              new File(datadir),             createSocketAddr(conf.get("fs.default.name", "local")), conf);    }    /**     * A DataNode can also be created with configuration information     * explicitly given.     */    public DataNode(String machineName, File datadir, InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {        this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr, conf);        this.data = new FSDataset(datadir, conf);        ServerSocket ss = null;        int tmpPort = conf.getInt("dfs.datanode.port", 50010);        while (ss == null) {            try {                ss = new ServerSocket(tmpPort);                LOG.info("Opened server at " + tmpPort);            } catch (IOException ie) {                LOG.info("Could not open server at " + tmpPort + ", trying new port");                tmpPort++;            }        }        this.localName = machineName + ":" + tmpPort;        this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));        this.dataXceiveServer.start();        long blockReportIntervalBasis =          conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);        this.blockReportInterval =          blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));        this.datanodeStartupPeriod =          conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);    }    /**     * Return the namenode's identifier     */    public String getNamenode() {        //return namenode.toString();	return "<namenode>";    }    /**     * Shut down this instance of the datanode.     * Returns only after shutdown is complete.     */    void shutdown() {        this.shouldRun = false;        ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();        try {            this.dataXceiveServer.join();        } catch (InterruptedException ie) {        }    }    /**     * Main loop for the DataNode.  Runs until shutdown,     * forever calling remote NameNode functions.     */    public void offerService() throws Exception {        long wakeups = 0;        long lastHeartbeat = 0, lastBlockReport = 0;        long sendStart = System.currentTimeMillis();        int heartbeatsSent = 0;        LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");        //        // Now loop for a long time....        //        while (shouldRun) {            long now = System.currentTimeMillis();            //            // Every so often, send heartbeat or block-report            //            synchronized (receivedBlockList) {                if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {                    //                    // All heartbeat messages include following info:                    // -- Datanode name                    // -- data transfer port                    // -- Total capacity                    // -- Bytes remaining                    //                    namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());                    //LOG.info("Just sent heartbeat, with name " + localName);                    lastHeartbeat = now;		}		if (now - lastBlockReport > blockReportInterval) {                    //                    // Send latest blockinfo report if timer has expired.                    // Get back a list of local block(s) that are obsolete                    // and can be safely GC'ed.                    //                    Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());                    data.invalidate(toDelete);                    lastBlockReport = now;                    continue;		}		if (receivedBlockList.size() > 0) {                    //                    // Send newly-received blockids to namenode                    //                    Block blockArray[] = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);                    receivedBlockList.removeAllElements();                    namenode.blockReceived(localName, blockArray);                }		//		// Only perform block operations (transfer, delete) after 		// a startup quiet period.  The assumption is that all the		// datanodes will be started together, but the namenode may		// have been started some time before.  (This is esp. true in		// the case of network interruptions.)  So, wait for some time		// to pass from the time of connection to the first block-transfer.		// Otherwise we transfer a lot of blocks unnecessarily.		//		if (now - sendStart > datanodeStartupPeriod) {		    //		    // Check to see if there are any block-instructions from the		    // namenode that this datanode should perform.		    //		    BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);		    if (cmd != null && cmd.transferBlocks()) {			//			// Send a copy of a block to another datanode			//			Block blocks[] = cmd.getBlocks();			DatanodeInfo xferTargets[][] = cmd.getTargets();						for (int i = 0; i < blocks.length; i++) {			    if (!data.isValidBlock(blocks[i])) {				String errStr = "Can't send invalid block " + blocks[i];				LOG.info(errStr);				namenode.errorReport(localName, errStr);				break;			    } else {				if (xferTargets[i].length > 0) {				    LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);				    new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();				}			    }			}                    } else if (cmd != null && cmd.invalidateBlocks()) {                        //                        // Some local block(s) are obsolete and can be                         // safely garbage-collected.                        //                        data.invalidate(cmd.getBlocks());                    }                }                //                // There is no work to do;  sleep until hearbeat timer elapses,                 // or work arrives, and then iterate again.                //                long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);                if (waitTime > 0 && receivedBlockList.size() == 0) {                    try {                        receivedBlockList.wait(waitTime);                    } catch (InterruptedException ie) {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?