dfsclient.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 847 行 · 第 1/2 页
JAVA
847 行
public synchronized void close() throws IOException { if (closed) { throw new IOException("Stream closed"); } if (s != null) { blockStream.close(); s.close(); s = null; } super.close(); closed = true; } /** * Basic read() */ public synchronized int read() throws IOException { if (closed) { throw new IOException("Stream closed"); } int result = -1; if (pos < filelen) { if (pos > blockEnd) { blockSeekTo(pos); } result = blockStream.read(); if (result >= 0) { pos++; } } return result; } /** * Read the entire buffer. */ public synchronized int read(byte buf[], int off, int len) throws IOException { if (closed) { throw new IOException("Stream closed"); } if (pos < filelen) { if (pos > blockEnd) { blockSeekTo(pos); } int result = blockStream.read(buf, off, Math.min(len, (int) (blockEnd - pos + 1))); if (result >= 0) { pos += result; } return result; } return -1; } /** * Seek to a new arbitrary location */ public synchronized void seek(long targetPos) throws IOException { if (targetPos >= filelen) { throw new IOException("Cannot seek after EOF"); } pos = targetPos; blockEnd = -1; } /** */ public synchronized long getPos() throws IOException { return pos; } /** */ public synchronized int available() throws IOException { if (closed) { throw new IOException("Stream closed"); } return (int) (filelen - pos); } /** * We definitely don't support marks */ public boolean markSupported() { return false; } public void mark(int readLimit) { } public void reset() throws IOException { throw new IOException("Mark not supported"); } } /**************************************************************** * DFSOutputStream creates files from a stream of bytes. ****************************************************************/ class DFSOutputStream extends FSOutputStream { private Socket s; boolean closed = false; private byte outBuf[] = new byte[BUFFER_SIZE]; private int pos = 0; private UTF8 src; private boolean overwrite; private boolean firstTime = true; private DataOutputStream blockStream; private DataInputStream blockReplyStream; private File backupFile; private OutputStream backupStream; private Block block; private long filePos = 0; private int bytesWrittenToBlock = 0; /** * Create a new output stream to the given DataNode. */ public DFSOutputStream(UTF8 src, boolean overwrite) throws IOException { this.src = src; this.overwrite = overwrite; this.backupFile = newBackupFile(); this.backupStream = new FileOutputStream(backupFile); } private File newBackupFile() throws IOException { File result = conf.getFile("dfs.data.dir", "tmp"+File.separator+ "client-"+Math.abs(r.nextLong())); result.deleteOnExit(); return result; } /** * Open a DataOutputStream to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. * Must get block ID and the IDs of the destinations from the namenode. */ private synchronized void nextBlockOutputStream() throws IOException { boolean retry = false; long start = System.currentTimeMillis(); do { retry = false; long localstart = System.currentTimeMillis(); boolean blockComplete = false; LocatedBlock lb = null; while (! blockComplete) { if (firstTime) { lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite); } else { lb = namenode.addBlock(src.toString(), localName); } if (lb == null) { try { Thread.sleep(400); if (System.currentTimeMillis() - localstart > 5000) { LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms"); } } catch (InterruptedException ie) { } } else { blockComplete = true; } } block = lb.getBlock(); DatanodeInfo nodes[] = lb.getLocations(); // // Connect to first DataNode in the list. Abort if this fails. // InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString()); try { s = new Socket(); s.connect(target, READ_TIMEOUT); s.setSoTimeout(READ_TIMEOUT); } catch (IOException ie) { // Connection failed. Let's wait a little bit and retry try { if (System.currentTimeMillis() - start > 5000) { LOG.info("Waiting to find target node: " + target); } Thread.sleep(6000); } catch (InterruptedException iex) { } if (firstTime) { namenode.abandonFileInProgress(src.toString()); } else { namenode.abandonBlock(block, src.toString()); } retry = true; continue; } // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(OP_WRITE_BLOCK); out.writeBoolean(false); block.write(out); out.writeInt(nodes.length); for (int i = 0; i < nodes.length; i++) { nodes[i].write(out); } out.write(CHUNKED_ENCODING); bytesWrittenToBlock = 0; blockStream = out; blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream())); } while (retry); firstTime = false; } /** * We're referring to the file pos here */ public synchronized long getPos() throws IOException { return filePos; } /** * Writes the specified byte to this output stream. */ public synchronized void write(int b) throws IOException { if (closed) { throw new IOException("Stream closed"); } if ((bytesWrittenToBlock + pos == BLOCK_SIZE) || (pos >= BUFFER_SIZE)) { flush(); } outBuf[pos++] = (byte) b; filePos++; } /** * Writes the specified bytes to this output stream. */ public synchronized void write(byte b[], int off, int len) throws IOException { if (closed) { throw new IOException("Stream closed"); } while (len > 0) { int remaining = BUFFER_SIZE - pos; int toWrite = Math.min(remaining, len); System.arraycopy(b, off, outBuf, pos, toWrite); pos += toWrite; off += toWrite; len -= toWrite; filePos += toWrite; if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) || (pos == BUFFER_SIZE)) { flush(); } } } /** * Flush the buffer, getting a stream to a new block if necessary. */ public synchronized void flush() throws IOException { if (closed) { throw new IOException("Stream closed"); } if (bytesWrittenToBlock + pos >= BLOCK_SIZE) { flushData(BLOCK_SIZE - bytesWrittenToBlock); } if (bytesWrittenToBlock == BLOCK_SIZE) { endBlock(); } flushData(pos); } /** * Actually flush the accumulated bytes to the remote node, * but no more bytes than the indicated number. */ private synchronized void flushData(int maxPos) throws IOException { int workingPos = Math.min(pos, maxPos); if (workingPos > 0) { // // To the local block backup, write just the bytes // backupStream.write(outBuf, 0, workingPos); // // Track position // bytesWrittenToBlock += workingPos; System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); pos -= workingPos; } } /** * We're done writing to the current block. */ private synchronized void endBlock() throws IOException { // // Done with local copy // backupStream.close(); // // Send it to datanode // boolean mustRecover = true; while (mustRecover) { nextBlockOutputStream(); InputStream in = new FileInputStream(backupFile); try { byte buf[] = new byte[BUFFER_SIZE]; int bytesRead = in.read(buf); while (bytesRead > 0) { blockStream.writeLong((long) bytesRead); blockStream.write(buf, 0, bytesRead); bytesRead = in.read(buf); } internalClose(); mustRecover = false; } catch (IOException ie) { handleSocketException(ie); } finally { in.close(); } } // // Delete local backup, start new one // backupFile.delete(); backupFile = newBackupFile(); backupStream = new FileOutputStream(backupFile); bytesWrittenToBlock = 0; } /** * Close down stream to remote datanode. */ private synchronized void internalClose() throws IOException { blockStream.writeLong(0); blockStream.flush(); long complete = blockReplyStream.readLong(); if (complete != WRITE_COMPLETE) { LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); } LocatedBlock lb = new LocatedBlock(); lb.readFields(blockReplyStream); namenode.reportWrittenBlock(lb); s.close(); s = null; } private void handleSocketException(IOException ie) throws IOException { LOG.log(Level.WARNING, "Error while writing.", ie); try { if (s != null) { s.close(); s = null; } } catch (IOException ie2) { LOG.log(Level.WARNING, "Error closing socket.", ie2); } namenode.abandonBlock(block, src.toString()); } /** * Closes this output stream and releases any system * resources associated with this stream. */ public synchronized void close() throws IOException { if (closed) { throw new IOException("Stream closed"); } flush(); if (filePos == 0 || bytesWrittenToBlock != 0) { try { endBlock(); } catch (IOException e) { namenode.abandonFileInProgress(src.toString()); throw e; } } backupStream.close(); backupFile.delete(); if (s != null) { s.close(); s = null; } super.close(); long localstart = System.currentTimeMillis(); boolean fileComplete = false; while (! fileComplete) { fileComplete = namenode.complete(src.toString(), clientName.toString()); if (!fileComplete) { try { Thread.sleep(400); if (System.currentTimeMillis() - localstart > 5000) { LOG.info("Could not complete file, retrying..."); } } catch (InterruptedException ie) { } } } closed = true; } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?