📄 datanode.java
字号:
break; } else { if (xferTargets[i].length > 0) { LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]); new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start(); } } } } else if (cmd.invalidateBlocks()) { // // Some local block(s) are obsolete and can be // safely garbage-collected. // Block toDelete[] = cmd.getBlocks(); data.invalidate(toDelete); myMetrics.removedBlocks(toDelete.length); } else if( cmd.shutdownNode()) { // shut down the data node this.shutdown(); continue; } } } // send block report if (now - lastBlockReport > blockReportInterval) { // before send block report, check if data directory is healthy data.checkDataDir(); // // Send latest blockinfo report if timer has expired. // Get back a list of local block(s) that are obsolete // and can be safely GC'ed. // Block toDelete[] = namenode.blockReport(dnRegistration, data.getBlockReport()); data.invalidate(toDelete); lastBlockReport = now; continue; } // check if there are newly received blocks Block [] blockArray=null; synchronized( receivedBlockList ) { if (receivedBlockList.size() > 0) { // // Send newly-received blockids to namenode // blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]); } } if( blockArray != null ) { namenode.blockReceived( dnRegistration, blockArray ); synchronized (receivedBlockList) { for(Block b: blockArray) { receivedBlockList.remove(b); } } } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat); synchronized( receivedBlockList ) { if (waitTime > 0 && receivedBlockList.size() == 0) { try { receivedBlockList.wait(waitTime); } catch (InterruptedException ie) { } } } // synchronized } catch(DiskErrorException e) { handleDiskError(e.getLocalizedMessage()); return; } catch( RemoteException re ) { String reClass = re.getClassName(); if( UnregisteredDatanodeException.class.getName().equals( reClass )) { LOG.warn( "DataNode is shutting down: " + StringUtils.stringifyException(re)); shutdown(); return; } LOG.warn(StringUtils.stringifyException(re)); } catch (IOException e) { LOG.warn(StringUtils.stringifyException(e)); } } // while (shouldRun) } // offerService /** * Server used for receiving/sending a block of data. * This is created to listen for requests from clients or * other DataNodes. This small server does not use the * Hadoop IPC mechanism. */ class DataXceiveServer implements Runnable { boolean shouldListen = true; ServerSocket ss; public DataXceiveServer(ServerSocket ss) { this.ss = ss; } /** */ public void run() { try { while (shouldListen) { Socket s = ss.accept(); //s.setSoTimeout(READ_TIMEOUT); data.checkDataDir(); xceiverCount.incr(); new Daemon(new DataXceiver(s)).start(); } ss.close(); } catch (DiskErrorException de ) { String errMsgr = de.getMessage(); LOG.warn("Exiting DataXceiveServer due to "+ errMsgr ); handleDiskError(errMsgr); } catch (IOException ie) { LOG.info("Exiting DataXceiveServer due to " + ie.toString()); } } public void kill() { this.shouldListen = false; try { this.ss.close(); } catch (IOException iex) { } } } /** * Thread for processing incoming/outgoing data stream */ class DataXceiver implements Runnable { Socket s; public DataXceiver(Socket s) { this.s = s; LOG.debug("Number of active connections is: "+xceiverCount); } /** * Read/write data from/to the DataXceiveServer. */ public void run() { try { DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); try { byte op = (byte) in.read(); if (op == OP_WRITE_BLOCK) { writeBlock(in); } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK || op == OP_READ_RANGE_BLOCK) { readBlock(in, op); } else { while (op >= 0) { System.out.println("Faulty op: " + op); op = (byte) in.read(); } throw new IOException("Unknown opcode for incoming data stream"); } } finally { in.close(); } } catch (IOException ie) { LOG.warn("DataXCeiver", ie); } finally { try { xceiverCount.decr(); LOG.debug("Number of active connections is: "+xceiverCount); s.close(); } catch (IOException ie2) { } } } /** * Read a block from the disk * @param in The stream to read from * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK * @throws IOException */ private void readBlock(DataInputStream in, byte op) throws IOException { // // Read in the header // Block b = new Block(); b.readFields(in); long toSkip = 0; long endOffset = -1; if (op == OP_READSKIP_BLOCK) { toSkip = in.readLong(); } else if (op == OP_READ_RANGE_BLOCK) { toSkip = in.readLong(); endOffset = in.readLong(); } // // Open reply stream // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { // // Write filelen of -1 if error // if (! data.isValidBlock(b)) { out.writeLong(-1); } else { // // Get blockdata from disk // long len = data.getLength(b); if (endOffset < 0) { endOffset = len; } DataInputStream in2 = new DataInputStream(data.getBlockData(b)); out.writeLong(len); long amtSkipped = 0; if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) { if (toSkip > len) { toSkip = len; } try { amtSkipped = in2.skip(toSkip); } catch (IOException iex) { shutdown(); throw iex; } out.writeLong(amtSkipped); } if (op == OP_READ_RANGE_BLOCK) { if (endOffset > len) { endOffset = len; } out.writeLong(endOffset); } byte buf[] = new byte[BUFFER_SIZE]; try { int toRead = (int) (endOffset - amtSkipped + 1); int bytesRead = 0; try { bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead)); myMetrics.readBytes(bytesRead); } catch (IOException iex) { shutdown(); throw iex; } while (toRead > 0 && bytesRead >= 0) { out.write(buf, 0, bytesRead); toRead -= bytesRead; if (toRead > 0) { try { bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead)); myMetrics.readBytes(bytesRead); } catch (IOException iex) { shutdown(); throw iex; } } } } catch (SocketException se) { // This might be because the reader // closed the stream early } finally { try { in2.close(); } catch (IOException iex) { shutdown(); throw iex; } } } myMetrics.readBlocks(1); LOG.info("Served block " + b + " to " + s.getInetAddress()); } finally { out.close(); } } /** * Write a block to disk. * @param in The stream to read from * @throws IOException */ private void writeBlock(DataInputStream in) throws IOException { // // Read in the header // DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { boolean shouldReportBlock = in.readBoolean(); Block b = new Block(); b.readFields(in); int numTargets = in.readInt(); if (numTargets <= 0) { throw new IOException("Mislabelled incoming datastream."); } DatanodeInfo targets[] = new DatanodeInfo[numTargets]; for (int i = 0; i < targets.length; i++) { DatanodeInfo tmp = new DatanodeInfo(); tmp.readFields(in); targets[i] = tmp; } byte encodingType = (byte) in.read(); long len = in.readLong(); // // Make sure curTarget is equal to this machine // DatanodeInfo curTarget = targets[0]; // // Track all the places we've successfully written the block // Vector mirrors = new Vector(); // // Open local disk out // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b))); InetSocketAddress mirrorTarget = null; String mirrorNode = null; try { // // Open network conn to backup machine, if // appropriate // DataInputStream in2 = null; DataOutputStream out2 = null; if (targets.length > 1) { // Connect to backup machine mirrorNode = targets[1].getName(); mirrorTarget = createSocketAddr(mirrorNode); try { Socket s2 = new Socket(); s2.connect(mirrorTarget, READ_TIMEOUT); s2.setSoTimeout(READ_TIMEOUT); out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream())); in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream())); // Write connection header out2.write(OP_WRITE_BLOCK); out2.writeBoolean(shouldReportBlock); b.write(out2); out2.writeInt(targets.length - 1); for (int i = 1; i < targets.length; i++) { targets[i].write(out2); } out2.write(encodingType); out2.writeLong(len); myMetrics.replicatedBlocks(1); } catch (IOException ie) { if (out2 != null) { LOG.info("Exception connecting to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(ie)); try { out2.close(); in2.close();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -