📄 datanode.java
字号:
} catch (IOException out2close) { } finally { out2 = null; in2 = null; } } } } // // Process incoming data, copy to disk and // maybe to network. // boolean anotherChunk = len != 0; byte buf[] = new byte[BUFFER_SIZE]; while (anotherChunk) { while (len > 0) { int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len)); if (bytesRead < 0) { throw new EOFException("EOF reading from "+s.toString()); } if (bytesRead > 0) { try { out.write(buf, 0, bytesRead); myMetrics.wroteBytes(bytesRead); } catch (IOException iex) { if (iex.getMessage().startsWith("No space left on device")) { throw new DiskOutOfSpaceException("No space left on device"); } else { shutdown(); throw iex; } } if (out2 != null) { try { out2.write(buf, 0, bytesRead); } catch (IOException out2e) { LOG.info("Exception writing to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(out2e)); // // If stream-copy fails, continue // writing to disk. We shouldn't // interrupt client write. // try { out2.close(); in2.close(); } catch (IOException out2close) { } finally { out2 = null; in2 = null; } } } len -= bytesRead; } } if (encodingType == RUNLENGTH_ENCODING) { anotherChunk = false; } else if (encodingType == CHUNKED_ENCODING) { len = in.readLong(); if (out2 != null) { try { out2.writeLong(len); } catch (IOException ie) { LOG.info("Exception writing to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(ie)); try { out2.close(); in2.close(); } catch (IOException ie2) { // NOTHING } finally { out2 = null; in2 = null; } } } if (len == 0) { anotherChunk = false; } } } if (out2 != null) { try { out2.flush(); long complete = in2.readLong(); if (complete != WRITE_COMPLETE) { LOG.info("Conflicting value for WRITE_COMPLETE: " + complete); } LocatedBlock newLB = new LocatedBlock(); newLB.readFields(in2); in2.close(); out2.close(); DatanodeInfo mirrorsSoFar[] = newLB.getLocations(); for (int k = 0; k < mirrorsSoFar.length; k++) { mirrors.add(mirrorsSoFar[k]); } } catch (IOException ie) { LOG.info("Exception writing to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(ie)); try { out2.close(); in2.close(); } catch (IOException ie2) { // NOTHING } finally { out2 = null; in2 = null; } } } if (out2 == null) { LOG.info("Received block " + b + " from " + s.getInetAddress()); } else { LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget); } } finally { try { out.close(); } catch (IOException iex) { shutdown(); throw iex; } } data.finalizeBlock(b); myMetrics.wroteBlocks(1); // // Tell the namenode that we've received this block // in full, if we've been asked to. This is done // during NameNode-directed block transfers, but not // client writes. // if (shouldReportBlock) { synchronized (receivedBlockList) { receivedBlockList.add(b); receivedBlockList.notifyAll(); } } // // Tell client job is done, and reply with // the new LocatedBlock. // reply.writeLong(WRITE_COMPLETE); mirrors.add(curTarget); LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()])); newLB.write(reply); } finally { reply.close(); } } } /** * Used for transferring a block of data. This class * sends a piece of data to another DataNode. */ class DataTransfer implements Runnable { InetSocketAddress curTarget; DatanodeInfo targets[]; Block b; byte buf[]; /** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ public DataTransfer(DatanodeInfo targets[], Block b) throws IOException { this.curTarget = createSocketAddr(targets[0].getName()); this.targets = targets; this.b = b; this.buf = new byte[BUFFER_SIZE]; } /** * Do the deed, write the bytes */ public void run() { xmitsInProgress++; try { Socket s = new Socket(); s.connect(curTarget, READ_TIMEOUT); s.setSoTimeout(READ_TIMEOUT); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { long filelen = data.getLength(b); DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b))); try { // // Header info // out.write(OP_WRITE_BLOCK); out.writeBoolean(true); b.write(out); out.writeInt(targets.length); for (int i = 0; i < targets.length; i++) { targets[i].write(out); } out.write(RUNLENGTH_ENCODING); out.writeLong(filelen); // // Write the data // while (filelen > 0) { int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length)); out.write(buf, 0, bytesRead); filelen -= bytesRead; } } finally { in.close(); } } finally { out.close(); } LOG.info("Transmitted block " + b + " to " + curTarget); } catch (IOException ie) { LOG.warn("Failed to transfer "+b+" to "+curTarget, ie); } finally { xmitsInProgress--; } } } /** * No matter what kind of exception we get, keep retrying to offerService(). * That's the loop that connects to the NameNode and provides basic DataNode * functionality. * * Only stop when "shouldRun" is turned off (which can only happen at shutdown). */ public void run() { LOG.info("Starting DataNode in: "+data); // start dataXceiveServer dataXceiveServer.start(); while (shouldRun) { try { offerService(); } catch (Exception ex) { LOG.error("Exception: " + StringUtils.stringifyException(ex)); if (shouldRun) { try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } } // wait for dataXceiveServer to terminate try { this.dataXceiveServer.join(); } catch (InterruptedException ie) { } LOG.info("Finishing DataNode in: "+data); } private static ArrayList dataNodeList = new ArrayList(); private static ArrayList dataNodeThreadList = new ArrayList(); /** Start datanode daemon. */ public static void run(Configuration conf) throws IOException { String[] dataDirs = conf.getStrings("dfs.data.dir"); DataNode dn = makeInstance(dataDirs, conf); dataNodeList.add(dn); if (dn != null) { Thread t = new Thread(dn, "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]"); t.setDaemon(true); // needed for JUnit testing t.start(); dataNodeThreadList.add(t); } } /** * Shut down all datanodes that where started via the run(conf) method. * Returns only after shutdown is complete. */ public static void shutdownAll(){ if(!dataNodeList.isEmpty()){ for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) { DataNode dataNode = (DataNode) iterator.next(); dataNode.shutdown(); } } } /** Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */ private static void runAndWait(Configuration conf) throws IOException { run(conf); if (dataNodeThreadList.size() > 0) { Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1); try { t.join(); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { // did someone knock? return; } } } } /** * Make an instance of DataNode after ensuring that at least one of the * given data directories (and their parent directories, if necessary) * can be created. * @param dataDirs List of directories, where the new DataNode instance should * keep its files. * @param conf Configuration instance to use. * @return DataNode instance for given list of data dirs and conf, or null if * no directory from this directory list can be created. * @throws IOException */ static DataNode makeInstance(String[] dataDirs, Configuration conf) throws IOException { ArrayList dirs = new ArrayList(); for (int i = 0; i < dataDirs.length; i++) { File data = new File(dataDirs[i]); try { DiskChecker.checkDir( data ); dirs.add(dataDirs[i]); } catch( DiskErrorException e ) { LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() ); } } return ((dirs.size() > 0) ? new DataNode(conf, dataDirs) : null); } public String toString() { return "DataNode{" + "data=" + data + ", localName='" + dnRegistration.getName() + "'" + ", storageID='" + dnRegistration.getStorageID() + "'" + ", xmitsInProgress=" + xmitsInProgress + "}"; } /** */ public static void main(String args[]) throws IOException { Configuration conf = new Configuration(); runAndWait(conf); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -