datanode.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 788 行 · 第 1/3 页
JAVA
788 行
// Open reply stream // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { // // Write filelen of -1 if error // if (! data.isValidBlock(b)) { out.writeLong(-1); } else { // // Get blockdata from disk // long len = data.getLength(b); DataInputStream in2 = new DataInputStream(data.getBlockData(b)); out.writeLong(len); if (op == OP_READSKIP_BLOCK) { if (toSkip > len) { toSkip = len; } long amtSkipped = 0; try { amtSkipped = in2.skip(toSkip); } catch (IOException iex) { shutdown(); throw iex; } out.writeLong(amtSkipped); } byte buf[] = new byte[BUFFER_SIZE]; try { int bytesRead = 0; try { bytesRead = in2.read(buf); } catch (IOException iex) { shutdown(); throw iex; } while (bytesRead >= 0) { out.write(buf, 0, bytesRead); len -= bytesRead; try { bytesRead = in2.read(buf); } catch (IOException iex) { shutdown(); throw iex; } } } catch (SocketException se) { // This might be because the reader // closed the stream early } finally { try { in2.close(); } catch (IOException iex) { shutdown(); throw iex; } } } LOG.info("Served block " + b + " to " + s.getInetAddress()); } finally { out.close(); } } else { while (op >= 0) { System.out.println("Faulty op: " + op); op = (byte) in.read(); } throw new IOException("Unknown opcode for incoming data stream"); } } finally { in.close(); } } catch (IOException ie) { LOG.log(Level.WARNING, "DataXCeiver", ie); } finally { try { s.close(); } catch (IOException ie2) { } } } } /** * Used for transferring a block of data. This class * sends a piece of data to another DataNode. */ class DataTransfer implements Runnable { InetSocketAddress curTarget; DatanodeInfo targets[]; Block b; byte buf[]; /** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ public DataTransfer(DatanodeInfo targets[], Block b) throws IOException { this.curTarget = createSocketAddr(targets[0].getName().toString()); this.targets = targets; this.b = b; this.buf = new byte[BUFFER_SIZE]; } /** * Do the deed, write the bytes */ public void run() { xmitsInProgress++; try { Socket s = new Socket(); s.connect(curTarget, READ_TIMEOUT); s.setSoTimeout(READ_TIMEOUT); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { long filelen = data.getLength(b); DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b))); try { // // Header info // out.write(OP_WRITE_BLOCK); out.writeBoolean(true); b.write(out); out.writeInt(targets.length); for (int i = 0; i < targets.length; i++) { targets[i].write(out); } out.write(RUNLENGTH_ENCODING); out.writeLong(filelen); // // Write the data // while (filelen > 0) { int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length)); out.write(buf, 0, bytesRead); filelen -= bytesRead; } } finally { in.close(); } } finally { out.close(); } LOG.info("Transmitted block " + b + " to " + curTarget); } catch (IOException ie) { LOG.log(Level.WARNING, "Failed to transfer "+b+" to "+curTarget, ie); } finally { xmitsInProgress--; } } } /** * No matter what kind of exception we get, keep retrying to offerService(). * That's the loop that connects to the NameNode and provides basic DataNode * functionality. * * Only stop when "shouldRun" is turned off (which can only happen at shutdown). */ public void run() { LOG.info("Starting DataNode in: "+data.data); while (shouldRun) { try { offerService(); } catch (Exception ex) { LOG.info("Exception: " + ex); if (shouldRun) { LOG.info("Lost connection to namenode. Retrying..."); try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } } LOG.info("Finishing DataNode in: "+data.data); } /** Start datanode daemons. * Start a datanode daemon for each comma separated data directory * specified in property dfs.data.dir */ public static void run(Configuration conf) throws IOException { String[] dataDirs = conf.getStrings("dfs.data.dir"); subThreadList = new Vector(dataDirs.length); for (int i = 0; i < dataDirs.length; i++) { DataNode dn = makeInstanceForDir(dataDirs[i], conf); if (dn != null) { Thread t = new Thread(dn, "DataNode: "+dataDirs[i]); t.setDaemon(true); // needed for JUnit testing t.start(); subThreadList.add(t); } } } /** Start datanode daemons. * Start a datanode daemon for each comma separated data directory * specified in property dfs.data.dir and wait for them to finish. * If this thread is specifically interrupted, it will stop waiting. */ private static void runAndWait(Configuration conf) throws IOException { run(conf); // Wait for sub threads to exit for (Iterator iterator = subThreadList.iterator(); iterator.hasNext();) { Thread threadDataNode = (Thread) iterator.next(); try { threadDataNode.join(); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { // did someone knock? return; } } } } /** * Make an instance of DataNode after ensuring that given data directory * (and parent directories, if necessary) can be created. * @param dataDir where the new DataNode instance should keep its files. * @param conf Configuration instance to use. * @return DataNode instance for given data dir and conf, or null if directory * cannot be created. * @throws IOException */ static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException { DataNode dn = null; File data = new File(dataDir); data.mkdirs(); if (!data.isDirectory()) { LOG.warning("Can't start DataNode in non-directory: "+dataDir); return null; } else { dn = new DataNode(conf, dataDir); } return dn; } public String toString() { return "DataNode{" + "data=" + data + ", localName='" + localName + "'" + ", xmitsInProgress=" + xmitsInProgress + "}"; } /** */ public static void main(String args[]) throws IOException { LogFormatter.setShowThreadIDs(true); runAndWait(new Configuration()); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?