datanode.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 788 行 · 第 1/3 页
JAVA
788 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/********************************************************** * DataNode is a class (and program) that stores a set of * blocks for a DFS deployment. A single deployment can * have one or many DataNodes. Each DataNode communicates * regularly with a single NameNode. It also communicates * with client code and other DataNodes from time to time. * * DataNodes store a series of named blocks. The DataNode * allows client code to read these blocks, or to write new * block data. The DataNode may also, in response to instructions * from its NameNode, delete blocks or copy blocks to/from other * DataNodes. * * The DataNode maintains just one critical table: * block-> stream of bytes (of BLOCK_SIZE or less) * * This info is stored on a local disk. The DataNode * reports the table's contents to the NameNode upon startup * and every so often afterwards. * * DataNodes spend their lives in an endless loop of asking * the NameNode for something to do. A NameNode cannot connect * to a DataNode directly; a NameNode simply returns values from * functions invoked by a DataNode. * * DataNodes maintain an open server socket so that client code * or other DataNodes can read/write data. The host/port for * this server is reported to the NameNode, which then sends that * information to clients or other DataNodes that might be interested. * * @author Mike Cafarella **********************************************************/public class DataNode implements FSConstants, Runnable { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.DataNode"); // // REMIND - mjc - I might bring "maxgigs" back so user can place // artificial limit on space //private static final long GIGABYTE = 1024 * 1024 * 1024; //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100); // /** * Util method to build socket addr from string */ public static InetSocketAddress createSocketAddr(String s) throws IOException { String target = s; int colonIndex = target.indexOf(':'); if (colonIndex < 0) { throw new RuntimeException("Not a host:port pair: " + s); } String host = target.substring(0, colonIndex); int port = Integer.parseInt(target.substring(colonIndex + 1)); return new InetSocketAddress(host, port); } private static Vector subThreadList = null; DatanodeProtocol namenode; FSDataset data; String localName; boolean shouldRun = true; Vector receivedBlockList = new Vector(); int xmitsInProgress = 0; Daemon dataXceiveServer = null; long blockReportInterval; private long datanodeStartupPeriod; private Configuration fConf; /** * Create the DataNode given a configuration and a dataDir. * 'dataDir' is where the blocks are stored. */ public DataNode(Configuration conf, String datadir) throws IOException { this(InetAddress.getLocalHost().getHostName(), new File(datadir), createSocketAddr(conf.get("fs.default.name", "local")), conf); } /** * A DataNode can also be created with configuration information * explicitly given. */ public DataNode(String machineName, File datadir, InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr, conf); this.data = new FSDataset(datadir, conf); ServerSocket ss = null; int tmpPort = conf.getInt("dfs.datanode.port", 50010); while (ss == null) { try { ss = new ServerSocket(tmpPort); LOG.info("Opened server at " + tmpPort); } catch (IOException ie) { LOG.info("Could not open server at " + tmpPort + ", trying new port"); tmpPort++; } } this.localName = machineName + ":" + tmpPort; this.dataXceiveServer = new Daemon(new DataXceiveServer(ss)); this.dataXceiveServer.start(); long blockReportIntervalBasis = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL); this.blockReportInterval = blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10)); this.datanodeStartupPeriod = conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD); } /** * Return the namenode's identifier */ public String getNamenode() { //return namenode.toString(); return "<namenode>"; } /** * Shut down this instance of the datanode. * Returns only after shutdown is complete. */ void shutdown() { this.shouldRun = false; ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill(); try { this.dataXceiveServer.join(); } catch (InterruptedException ie) { } } /** * Main loop for the DataNode. Runs until shutdown, * forever calling remote NameNode functions. */ public void offerService() throws Exception { long wakeups = 0; long lastHeartbeat = 0, lastBlockReport = 0; long sendStart = System.currentTimeMillis(); int heartbeatsSent = 0; LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec"); // // Now loop for a long time.... // while (shouldRun) { long now = System.currentTimeMillis(); // // Every so often, send heartbeat or block-report // synchronized (receivedBlockList) { if (now - lastHeartbeat > HEARTBEAT_INTERVAL) { // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining()); //LOG.info("Just sent heartbeat, with name " + localName); lastHeartbeat = now; } if (now - lastBlockReport > blockReportInterval) { // // Send latest blockinfo report if timer has expired. // Get back a list of local block(s) that are obsolete // and can be safely GC'ed. // Block toDelete[] = namenode.blockReport(localName, data.getBlockReport()); data.invalidate(toDelete); lastBlockReport = now; continue; } if (receivedBlockList.size() > 0) { // // Send newly-received blockids to namenode // Block blockArray[] = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]); receivedBlockList.removeAllElements(); namenode.blockReceived(localName, blockArray); } // // Only perform block operations (transfer, delete) after // a startup quiet period. The assumption is that all the // datanodes will be started together, but the namenode may // have been started some time before. (This is esp. true in // the case of network interruptions.) So, wait for some time // to pass from the time of connection to the first block-transfer. // Otherwise we transfer a lot of blocks unnecessarily. // if (now - sendStart > datanodeStartupPeriod) { // // Check to see if there are any block-instructions from the // namenode that this datanode should perform. // BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress); if (cmd != null && cmd.transferBlocks()) { // // Send a copy of a block to another datanode // Block blocks[] = cmd.getBlocks(); DatanodeInfo xferTargets[][] = cmd.getTargets(); for (int i = 0; i < blocks.length; i++) { if (!data.isValidBlock(blocks[i])) { String errStr = "Can't send invalid block " + blocks[i]; LOG.info(errStr); namenode.errorReport(localName, errStr); break; } else { if (xferTargets[i].length > 0) { LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]); new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start(); } } } } else if (cmd != null && cmd.invalidateBlocks()) { // // Some local block(s) are obsolete and can be // safely garbage-collected. // data.invalidate(cmd.getBlocks()); } } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat); if (waitTime > 0 && receivedBlockList.size() == 0) { try { receivedBlockList.wait(waitTime); } catch (InterruptedException ie) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?