📄 datanode.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.commons.logging.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.metrics.Metrics;import org.apache.hadoop.net.DNS;import org.apache.hadoop.util.*;import org.apache.hadoop.util.DiskChecker.DiskErrorException;import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;import org.apache.hadoop.mapred.StatusHttpServer;import java.io.*;import java.net.*;import java.util.*;import org.apache.hadoop.metrics.MetricsRecord;/********************************************************** * DataNode is a class (and program) that stores a set of * blocks for a DFS deployment. A single deployment can * have one or many DataNodes. Each DataNode communicates * regularly with a single NameNode. It also communicates * with client code and other DataNodes from time to time. * * DataNodes store a series of named blocks. The DataNode * allows client code to read these blocks, or to write new * block data. The DataNode may also, in response to instructions * from its NameNode, delete blocks or copy blocks to/from other * DataNodes. * * The DataNode maintains just one critical table: * block-> stream of bytes (of BLOCK_SIZE or less) * * This info is stored on a local disk. The DataNode * reports the table's contents to the NameNode upon startup * and every so often afterwards. * * DataNodes spend their lives in an endless loop of asking * the NameNode for something to do. A NameNode cannot connect * to a DataNode directly; a NameNode simply returns values from * functions invoked by a DataNode. * * DataNodes maintain an open server socket so that client code * or other DataNodes can read/write data. The host/port for * this server is reported to the NameNode, which then sends that * information to clients or other DataNodes that might be interested. * * @author Mike Cafarella **********************************************************/public class DataNode implements FSConstants, Runnable { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode"); // // REMIND - mjc - I might bring "maxgigs" back so user can place // artificial limit on space //private static final long GIGABYTE = 1024 * 1024 * 1024; //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100); // /** * Util method to build socket addr from string */ public static InetSocketAddress createSocketAddr(String s) throws IOException { String target = s; int colonIndex = target.indexOf(':'); if (colonIndex < 0) { throw new RuntimeException("Not a host:port pair: " + s); } String host = target.substring(0, colonIndex); int port = Integer.parseInt(target.substring(colonIndex + 1)); return new InetSocketAddress(host, port); } DatanodeProtocol namenode; FSDataset data; DatanodeRegistration dnRegistration; boolean shouldRun = true; Vector receivedBlockList = new Vector(); int xmitsInProgress = 0; Daemon dataXceiveServer = null; long blockReportInterval; private DataStorage storage = null; private StatusHttpServer infoServer; private static InetSocketAddress nameNodeAddr; private static DataNode datanodeObject = null; private class DataNodeMetrics { private MetricsRecord metricsRecord = null; private long bytesWritten = 0L; private long bytesRead = 0L; private long blocksWritten = 0L; private long blocksRead = 0L; private long blocksReplicated = 0L; private long blocksRemoved = 0L; DataNodeMetrics() { metricsRecord = Metrics.createRecord("dfs", "datanode"); } synchronized void readBytes(int nbytes) { bytesRead += nbytes; Metrics.report(metricsRecord, "bytes-read", bytesRead); } synchronized void wroteBytes(int nbytes) { bytesWritten += nbytes; Metrics.report(metricsRecord, "bytes-written", bytesWritten); } synchronized void readBlocks(int nblocks) { blocksRead += nblocks; Metrics.report(metricsRecord, "blocks-read", blocksRead); } synchronized void wroteBlocks(int nblocks) { blocksWritten += nblocks; Metrics.report(metricsRecord, "blocks-written", blocksWritten); } synchronized void replicatedBlocks(int nblocks) { blocksReplicated += nblocks; Metrics.report(metricsRecord, "blocks-replicated", blocksReplicated); } synchronized void removedBlocks(int nblocks) { blocksRemoved += nblocks; Metrics.report(metricsRecord, "blocks-removed", blocksRemoved); } } DataNodeMetrics myMetrics = new DataNodeMetrics(); /** * Create the DataNode given a configuration and an array of dataDirs. * 'dataDirs' is where the blocks are stored. */ DataNode(Configuration conf, String[] dataDirs) throws IOException { this(InetAddress.getLocalHost().getHostName(), dataDirs, createSocketAddr(conf.get("fs.default.name", "local")), conf); // register datanode register(); int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075); String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0"); this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true); //create a servlet to serve full-file content this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class); this.infoServer.start(); this.dnRegistration.infoPort = this.infoServer.getPort(); datanodeObject = this; } /** * A DataNode can also be created with configuration information * explicitly given. * * @see DataStorage */ private DataNode(String machineName, String[] dataDirs, InetSocketAddress nameNodeAddr, Configuration conf ) throws IOException { File[] volumes = new File[dataDirs.length]; for (int idx = 0; idx < dataDirs.length; idx++) { volumes[idx] = new File(dataDirs[idx]); } // use configured nameserver & interface to get local hostname machineName = DNS.getDefaultHost (conf.get("dfs.datanode.dns.interface","default"), conf.get("dfs.datanode.dns.nameserver","default")); // get storage info and lock the data dirs storage = new DataStorage( volumes ); int numDirs = storage.getNumLocked(); if (numDirs == 0) { // all data dirs are in use throw new IOException("Cannot start multiple Datanode instances " + "sharing the same data directories.\n" + StringUtils.arrayToString(dataDirs) + " are locked. "); } volumes = storage.getLockedDirs(); // connect to name node this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class, DatanodeProtocol.versionID, nameNodeAddr, conf); // find free port ServerSocket ss = null; int tmpPort = conf.getInt("dfs.datanode.port", 50010); String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0"); while (ss == null) { try { ss = new ServerSocket(tmpPort,0,InetAddress.getByName(bindAddress)); LOG.info("Opened server at " + tmpPort); } catch (IOException ie) { LOG.info("Could not open server at " + tmpPort + ", trying new port"); tmpPort++; } } // construct registration this.dnRegistration = new DatanodeRegistration( DFS_CURRENT_VERSION, machineName + ":" + tmpPort, storage.getStorageID(), -1, "" ); // initialize data node internal structure this.data = new FSDataset(volumes, conf); this.dataXceiveServer = new Daemon(new DataXceiveServer(ss)); long blockReportIntervalBasis = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL); this.blockReportInterval = blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10)); this.nameNodeAddr = nameNodeAddr; } /** Return the DataNode object * */ public static DataNode getDataNode() { return datanodeObject; } public InetSocketAddress getNameNodeAddr() { return nameNodeAddr; } /** * Return the namenode's identifier */ public String getNamenode() { //return namenode.toString(); return "<namenode>"; } /** * Register datanode * <p> * The datanode needs to register with the namenode on startup in order * 1) to report which storage it is serving now and * 2) to receive a registrationID * issued by the namenode to recognize registered datanodes. * * @see FSNamesystem#registerDatanode(DatanodeRegistration) * @throws IOException */ private void register() throws IOException { dnRegistration = namenode.register( dnRegistration ); if( storage.getStorageID().equals("") ) { storage.setStorageID( dnRegistration.getStorageID()); storage.writeAll(); } } /** * Shut down this instance of the datanode. * Returns only after shutdown is complete. */ public void shutdown() { try { infoServer.stop(); } catch (Exception e) { } this.shouldRun = false; ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill(); try { this.storage.closeAll(); } catch (IOException ie) { } } void handleDiskError( String errMsgr ) { LOG.warn( "DataNode is shutting down.\n" + errMsgr ); try { namenode.errorReport( dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr); } catch( IOException ignored) { } shutdown(); } private static class Count { int value = 0; Count(int init) { value = init; } synchronized void incr() { value++; } synchronized void decr() { value--; } public String toString() { return Integer.toString(value); } public int getValue() { return value; } } Count xceiverCount = new Count(0); /** * Main loop for the DataNode. Runs until shutdown, * forever calling remote NameNode functions. */ public void offerService() throws Exception { long lastHeartbeat = 0, lastBlockReport = 0; LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec"); // // Now loop for a long time.... // while (shouldRun) { try { long now = System.currentTimeMillis(); // // Every so often, send heartbeat or block-report // if (now - lastHeartbeat > HEARTBEAT_INTERVAL) { // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // BlockCommand cmd = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getRemaining(), xmitsInProgress, xceiverCount.getValue()); //LOG.info("Just sent heartbeat, with name " + localName); lastHeartbeat = now; if( cmd != null ) { data.checkDataDir(); if (cmd.transferBlocks()) { // // Send a copy of a block to another datanode // Block blocks[] = cmd.getBlocks(); DatanodeInfo xferTargets[][] = cmd.getTargets(); for (int i = 0; i < blocks.length; i++) { if (!data.isValidBlock(blocks[i])) { String errStr = "Can't send invalid block " + blocks[i]; LOG.info(errStr); namenode.errorReport( dnRegistration, DatanodeProtocol.INVALID_BLOCK, errStr);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -