datanode.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 788 行 · 第 1/3 页
JAVA
788 行
} } } } } /** * Server used for receiving/sending a block of data. * This is created to listen for requests from clients or * other DataNodes. This small server does not use the * Hadoop IPC mechanism. */ class DataXceiveServer implements Runnable { boolean shouldListen = true; ServerSocket ss; public DataXceiveServer(ServerSocket ss) { this.ss = ss; } /** */ public void run() { try { while (shouldListen) { Socket s = ss.accept(); //s.setSoTimeout(READ_TIMEOUT); new Daemon(new DataXceiver(s)).start(); } ss.close(); } catch (IOException ie) { LOG.info("Exiting DataXceiveServer due to " + ie.toString()); } } public void kill() { this.shouldListen = false; try { this.ss.close(); } catch (IOException iex) { } } } /** * Thread for processing incoming/outgoing data stream */ class DataXceiver implements Runnable { Socket s; public DataXceiver(Socket s) { this.s = s; } /** * Read/write data from/to the DataXceiveServer. */ public void run() { try { DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); try { byte op = (byte) in.read(); if (op == OP_WRITE_BLOCK) { // // Read in the header // DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { boolean shouldReportBlock = in.readBoolean(); Block b = new Block(); b.readFields(in); int numTargets = in.readInt(); if (numTargets <= 0) { throw new IOException("Mislabelled incoming datastream."); } DatanodeInfo targets[] = new DatanodeInfo[numTargets]; for (int i = 0; i < targets.length; i++) { DatanodeInfo tmp = new DatanodeInfo(); tmp.readFields(in); targets[i] = tmp; } byte encodingType = (byte) in.read(); long len = in.readLong(); // // Make sure curTarget is equal to this machine // DatanodeInfo curTarget = targets[0]; // // Track all the places we've successfully written the block // Vector mirrors = new Vector(); // // Open local disk out // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b))); InetSocketAddress mirrorTarget = null; try { // // Open network conn to backup machine, if // appropriate // DataInputStream in2 = null; DataOutputStream out2 = null; if (targets.length > 1) { // Connect to backup machine mirrorTarget = createSocketAddr(targets[1].getName().toString()); try { Socket s2 = new Socket(); s2.connect(mirrorTarget, READ_TIMEOUT); s2.setSoTimeout(READ_TIMEOUT); out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream())); in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream())); // Write connection header out2.write(OP_WRITE_BLOCK); out2.writeBoolean(shouldReportBlock); b.write(out2); out2.writeInt(targets.length - 1); for (int i = 1; i < targets.length; i++) { targets[i].write(out2); } out2.write(encodingType); out2.writeLong(len); } catch (IOException ie) { if (out2 != null) { try { out2.close(); in2.close(); } catch (IOException out2close) { } finally { out2 = null; in2 = null; } } } } // // Process incoming data, copy to disk and // maybe to network. // try { boolean anotherChunk = len != 0; byte buf[] = new byte[BUFFER_SIZE]; while (anotherChunk) { while (len > 0) { int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len)); if (bytesRead < 0) { throw new EOFException("EOF reading from "+s.toString()); } if (bytesRead > 0) { try { out.write(buf, 0, bytesRead); } catch (IOException iex) { shutdown(); throw iex; } if (out2 != null) { try { out2.write(buf, 0, bytesRead); } catch (IOException out2e) { // // If stream-copy fails, continue // writing to disk. We shouldn't // interrupt client write. // try { out2.close(); in2.close(); } catch (IOException out2close) { } finally { out2 = null; in2 = null; } } } len -= bytesRead; } } if (encodingType == RUNLENGTH_ENCODING) { anotherChunk = false; } else if (encodingType == CHUNKED_ENCODING) { len = in.readLong(); if (out2 != null) { out2.writeLong(len); } if (len == 0) { anotherChunk = false; } } } if (out2 == null) { LOG.info("Received block " + b + " from " + s.getInetAddress()); } else { out2.flush(); long complete = in2.readLong(); if (complete != WRITE_COMPLETE) { LOG.info("Conflicting value for WRITE_COMPLETE: " + complete); } LocatedBlock newLB = new LocatedBlock(); newLB.readFields(in2); DatanodeInfo mirrorsSoFar[] = newLB.getLocations(); for (int k = 0; k < mirrorsSoFar.length; k++) { mirrors.add(mirrorsSoFar[k]); } LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget); } } finally { if (out2 != null) { out2.close(); in2.close(); } } } finally { try { out.close(); } catch (IOException iex) { shutdown(); throw iex; } } data.finalizeBlock(b); // // Tell the namenode that we've received this block // in full, if we've been asked to. This is done // during NameNode-directed block transfers, but not // client writes. // if (shouldReportBlock) { synchronized (receivedBlockList) { receivedBlockList.add(b); receivedBlockList.notifyAll(); } } // // Tell client job is done, and reply with // the new LocatedBlock. // reply.writeLong(WRITE_COMPLETE); mirrors.add(curTarget); LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()])); newLB.write(reply); } finally { reply.close(); } } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) { // // Read in the header // Block b = new Block(); b.readFields(in); long toSkip = 0; if (op == OP_READSKIP_BLOCK) { toSkip = in.readLong(); } //
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?