📄 dfsclient.java
字号:
/** * We definitely don't support marks */ public boolean markSupported() { return false; } public void mark(int readLimit) { } public void reset() throws IOException { throw new IOException("Mark not supported"); } } /**************************************************************** * DFSOutputStream creates files from a stream of bytes. ****************************************************************/ class DFSOutputStream extends FSOutputStream { private Socket s; boolean closed = false; private byte outBuf[] = new byte[BUFFER_SIZE]; private int pos = 0; private UTF8 src; private boolean overwrite; private short replication; private boolean firstTime = true; private DataOutputStream blockStream; private DataInputStream blockReplyStream; private File backupFile; private OutputStream backupStream; private Block block; private long filePos = 0; private int bytesWrittenToBlock = 0; private String datanodeName; private long blockSize; private Progressable progress; /** * Create a new output stream to the given DataNode. */ public DFSOutputStream(UTF8 src, boolean overwrite, short replication, long blockSize, Progressable progress ) throws IOException { this.src = src; this.overwrite = overwrite; this.replication = replication; this.backupFile = newBackupFile(); this.blockSize = blockSize; this.backupStream = new FileOutputStream(backupFile); this.progress = progress; if (progress != null) { LOG.debug("Set non-null progress callback on DFSOutputStream "+src); } } private File newBackupFile() throws IOException { File result = conf.getFile("dfs.client.buffer.dir", "tmp"+File.separator+ "client-"+Math.abs(r.nextLong())); result.deleteOnExit(); return result; } /** * Open a DataOutputStream to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. * Must get block ID and the IDs of the destinations from the namenode. */ private synchronized void nextBlockOutputStream() throws IOException { boolean retry = false; long startTime = System.currentTimeMillis(); do { retry = false; LocatedBlock lb; if (firstTime) { lb = locateNewBlock(); } else { lb = locateFollowingBlock(startTime); } block = lb.getBlock(); DatanodeInfo nodes[] = lb.getLocations(); // // Connect to first DataNode in the list. Abort if this fails. // InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName()); try { s = new Socket(); s.connect(target, READ_TIMEOUT); s.setSoTimeout(replication * READ_TIMEOUT); datanodeName = nodes[0].getName(); } catch (IOException ie) { // Connection failed. Let's wait a little bit and retry try { if (System.currentTimeMillis() - startTime > 5000) { LOG.info("Waiting to find target node: " + target); } Thread.sleep(6000); } catch (InterruptedException iex) { } if (firstTime) { namenode.abandonFileInProgress(src.toString(), clientName); } else { namenode.abandonBlock(block, src.toString()); } retry = true; continue; } // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(OP_WRITE_BLOCK); out.writeBoolean(false); block.write(out); out.writeInt(nodes.length); for (int i = 0; i < nodes.length; i++) { nodes[i].write(out); } out.write(CHUNKED_ENCODING); bytesWrittenToBlock = 0; blockStream = out; blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream())); } while (retry); firstTime = false; } private LocatedBlock locateNewBlock() throws IOException { int retries = 3; while (true) { while (true) { try { return namenode.create(src.toString(), clientName.toString(), localName, overwrite, replication, blockSize); } catch (RemoteException e) { if (--retries == 0 || !AlreadyBeingCreatedException.class.getName(). equals(e.getClassName())) { throw e; } else { // because failed tasks take upto LEASE_PERIOD to // release their pendingCreates files, if the file // we want to create is already being created, // wait and try again. LOG.info(StringUtils.stringifyException(e)); try { Thread.sleep(LEASE_PERIOD); } catch (InterruptedException ie) { } } } } } } private LocatedBlock locateFollowingBlock(long start ) throws IOException { int retries = 5; while (true) { long localstart = System.currentTimeMillis(); while (true) { try { return namenode.addBlock(src.toString(), clientName.toString()); } catch (RemoteException e) { if (--retries == 0 || !NotReplicatedYetException.class.getName(). equals(e.getClassName())) { throw e; } else { LOG.info(StringUtils.stringifyException(e)); if (System.currentTimeMillis() - localstart > 5000) { LOG.info("Waiting for replication for " + (System.currentTimeMillis() - localstart)/1000 + " seconds"); } try { Thread.sleep(400); } catch (InterruptedException ie) { } } } } } } /** * We're referring to the file pos here */ public synchronized long getPos() throws IOException { return filePos; } /** * Writes the specified byte to this output stream. */ public synchronized void write(int b) throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } if ((bytesWrittenToBlock + pos == blockSize) || (pos >= BUFFER_SIZE)) { flush(); } outBuf[pos++] = (byte) b; filePos++; } /** * Writes the specified bytes to this output stream. */ public synchronized void write(byte b[], int off, int len) throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } while (len > 0) { int remaining = BUFFER_SIZE - pos; int toWrite = Math.min(remaining, len); System.arraycopy(b, off, outBuf, pos, toWrite); pos += toWrite; off += toWrite; len -= toWrite; filePos += toWrite; if ((bytesWrittenToBlock + pos >= blockSize) || (pos == BUFFER_SIZE)) { flush(); } } } /** * Flush the buffer, getting a stream to a new block if necessary. */ public synchronized void flush() throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } if (bytesWrittenToBlock + pos >= blockSize) { flushData((int) blockSize - bytesWrittenToBlock); } if (bytesWrittenToBlock == blockSize) { endBlock(); } flushData(pos); } /** * Actually flush the accumulated bytes to the remote node, * but no more bytes than the indicated number. */ private synchronized void flushData(int maxPos) throws IOException { int workingPos = Math.min(pos, maxPos); if (workingPos > 0) { // // To the local block backup, write just the bytes // backupStream.write(outBuf, 0, workingPos); // // Track position // bytesWrittenToBlock += workingPos; System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); pos -= workingPos; } } /** * We're done writing to the current block. */ private synchronized void endBlock() throws IOException { // // Done with local copy // backupStream.close(); // // Send it to datanode // boolean sentOk = false; int remainingAttempts = conf.getInt("dfs.client.block.write.retries", 3); while (!sentOk) { nextBlockOutputStream(); InputStream in = new FileInputStream(backupFile); try { byte buf[] = new byte[BUFFER_SIZE]; int bytesRead = in.read(buf); while (bytesRead > 0) { blockStream.writeLong((long) bytesRead); blockStream.write(buf, 0, bytesRead); if (progress != null) { progress.progress(); } bytesRead = in.read(buf); } internalClose(); sentOk = true; } catch (IOException ie) { handleSocketException(ie); remainingAttempts -= 1; if (remainingAttempts == 0) { throw ie; } } finally { in.close(); } } // // Delete local backup, start new one // backupFile.delete(); backupFile = newBackupFile(); backupStream = new FileOutputStream(backupFile); bytesWrittenToBlock = 0; } /** * Close down stream to remote datanode. */ private synchronized void internalClose() throws IOException { try { blockStream.writeLong(0); blockStream.flush(); long complete = blockReplyStream.readLong(); if (complete != WRITE_COMPLETE) { LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); } } catch (IOException ie) { throw (IOException) new IOException("failure closing block of file " + src.toString() + " to node " + (datanodeName == null ? "?" : datanodeName) ).initCause(ie); } LocatedBlock lb = new LocatedBlock(); lb.readFields(blockReplyStream); namenode.reportWrittenBlock(lb); s.close(); s = null; } private void handleSocketException(IOException ie) throws IOException { LOG.warn("Error while writing.", ie); try { if (s != null) { s.close(); s = null; } } catch (IOException ie2) { LOG.warn("Error closing socket.", ie2); } namenode.abandonBlock(block, src.toString()); } /** * Closes this output stream and releases any system * resources associated with this stream. */ public synchronized void close() throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } try { flush(); if (filePos == 0 || bytesWrittenToBlock != 0) { try { endBlock(); } catch (IOException e) { namenode.abandonFileInProgress(src.toString(), clientName); throw e; } } backupStream.close(); backupFile.delete(); if (s != null) { s.close(); s = null; } super.close(); long localstart = System.currentTimeMillis(); boolean fileComplete = false; while (! fileComplete) { fileComplete = namenode.complete(src.toString(), clientName.toString()); if (!fileComplete) { try { Thread.sleep(400); if (System.currentTimeMillis() - localstart > 5000) { LOG.info("Could not complete file, retrying..."); } } catch (InterruptedException ie) { } } } closed = true; } finally { synchronized (pendingCreates) { pendingCreates.remove(src.toString()); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -