dfsclient.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 847 行 · 第 1/2 页

JAVA
847
字号
        public synchronized void close() throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            if (s != null) {                blockStream.close();                s.close();                s = null;            }            super.close();            closed = true;        }        /**         * Basic read()         */        public synchronized int read() throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            int result = -1;            if (pos < filelen) {                if (pos > blockEnd) {                    blockSeekTo(pos);                }                result = blockStream.read();                if (result >= 0) {                    pos++;                }            }            return result;        }        /**         * Read the entire buffer.         */        public synchronized int read(byte buf[], int off, int len) throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            if (pos < filelen) {                if (pos > blockEnd) {                    blockSeekTo(pos);                }                int result = blockStream.read(buf, off, Math.min(len, (int) (blockEnd - pos + 1)));                if (result >= 0) {                    pos += result;                }                return result;            }            return -1;        }        /**         * Seek to a new arbitrary location         */        public synchronized void seek(long targetPos) throws IOException {            if (targetPos >= filelen) {                throw new IOException("Cannot seek after EOF");            }            pos = targetPos;            blockEnd = -1;        }        /**         */        public synchronized long getPos() throws IOException {            return pos;        }        /**         */        public synchronized int available() throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            return (int) (filelen - pos);        }        /**         * We definitely don't support marks         */        public boolean markSupported() {            return false;        }        public void mark(int readLimit) {        }        public void reset() throws IOException {            throw new IOException("Mark not supported");        }    }    /****************************************************************     * DFSOutputStream creates files from a stream of bytes.     ****************************************************************/    class DFSOutputStream extends FSOutputStream {        private Socket s;        boolean closed = false;        private byte outBuf[] = new byte[BUFFER_SIZE];        private int pos = 0;        private UTF8 src;        private boolean overwrite;        private boolean firstTime = true;        private DataOutputStream blockStream;        private DataInputStream blockReplyStream;        private File backupFile;        private OutputStream backupStream;        private Block block;        private long filePos = 0;        private int bytesWrittenToBlock = 0;        /**         * Create a new output stream to the given DataNode.         */        public DFSOutputStream(UTF8 src, boolean overwrite) throws IOException {            this.src = src;            this.overwrite = overwrite;            this.backupFile = newBackupFile();            this.backupStream = new FileOutputStream(backupFile);        }        private File newBackupFile() throws IOException {          File result = conf.getFile("dfs.data.dir",                                     "tmp"+File.separator+                                     "client-"+Math.abs(r.nextLong()));          result.deleteOnExit();          return result;        }        /**         * Open a DataOutputStream to a DataNode so that it can be written to.         * This happens when a file is created and each time a new block is allocated.         * Must get block ID and the IDs of the destinations from the namenode.         */        private synchronized void nextBlockOutputStream() throws IOException {            boolean retry = false;            long start = System.currentTimeMillis();            do {                retry = false;                                long localstart = System.currentTimeMillis();                boolean blockComplete = false;                LocatedBlock lb = null;                                while (! blockComplete) {                    if (firstTime) {                        lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);                    } else {                        lb = namenode.addBlock(src.toString(), localName);                    }                    if (lb == null) {                        try {                            Thread.sleep(400);                            if (System.currentTimeMillis() - localstart > 5000) {                                LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");                            }                        } catch (InterruptedException ie) {                        }                    } else {                        blockComplete = true;                    }                }                block = lb.getBlock();                DatanodeInfo nodes[] = lb.getLocations();                //                // Connect to first DataNode in the list.  Abort if this fails.                //                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());                try {                    s = new Socket();                    s.connect(target, READ_TIMEOUT);                    s.setSoTimeout(READ_TIMEOUT);                } catch (IOException ie) {                    // Connection failed.  Let's wait a little bit and retry                    try {                        if (System.currentTimeMillis() - start > 5000) {                            LOG.info("Waiting to find target node: " + target);                        }                        Thread.sleep(6000);                    } catch (InterruptedException iex) {                    }                    if (firstTime) {                        namenode.abandonFileInProgress(src.toString());                    } else {                        namenode.abandonBlock(block, src.toString());                    }                    retry = true;                    continue;                }                //                // Xmit header info to datanode                //                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                out.write(OP_WRITE_BLOCK);                out.writeBoolean(false);                block.write(out);                out.writeInt(nodes.length);                for (int i = 0; i < nodes.length; i++) {                    nodes[i].write(out);                }                out.write(CHUNKED_ENCODING);                bytesWrittenToBlock = 0;                blockStream = out;                blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));            } while (retry);            firstTime = false;        }        /**         * We're referring to the file pos here         */        public synchronized long getPos() throws IOException {            return filePos;        }			        /**         * Writes the specified byte to this output stream.         */        public synchronized void write(int b) throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||                (pos >= BUFFER_SIZE)) {                flush();            }            outBuf[pos++] = (byte) b;            filePos++;        }        /**         * Writes the specified bytes to this output stream.         */      public synchronized void write(byte b[], int off, int len)        throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            while (len > 0) {              int remaining = BUFFER_SIZE - pos;              int toWrite = Math.min(remaining, len);              System.arraycopy(b, off, outBuf, pos, toWrite);              pos += toWrite;              off += toWrite;              len -= toWrite;              filePos += toWrite;              if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||                  (pos == BUFFER_SIZE)) {                flush();              }            }        }        /**         * Flush the buffer, getting a stream to a new block if necessary.         */        public synchronized void flush() throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {                flushData(BLOCK_SIZE - bytesWrittenToBlock);            }            if (bytesWrittenToBlock == BLOCK_SIZE) {                endBlock();            }            flushData(pos);        }        /**         * Actually flush the accumulated bytes to the remote node,         * but no more bytes than the indicated number.         */        private synchronized void flushData(int maxPos) throws IOException {            int workingPos = Math.min(pos, maxPos);                        if (workingPos > 0) {                //                // To the local block backup, write just the bytes                //                backupStream.write(outBuf, 0, workingPos);                //                // Track position                //                bytesWrittenToBlock += workingPos;                System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);                pos -= workingPos;            }        }        /**         * We're done writing to the current block.         */        private synchronized void endBlock() throws IOException {            //            // Done with local copy            //            backupStream.close();            //            // Send it to datanode            //            boolean mustRecover = true;            while (mustRecover) {                nextBlockOutputStream();                InputStream in = new FileInputStream(backupFile);                try {                    byte buf[] = new byte[BUFFER_SIZE];                    int bytesRead = in.read(buf);                    while (bytesRead > 0) {                        blockStream.writeLong((long) bytesRead);                        blockStream.write(buf, 0, bytesRead);                        bytesRead = in.read(buf);                    }                    internalClose();                    mustRecover = false;                } catch (IOException ie) {                    handleSocketException(ie);                } finally {                  in.close();                }            }            //            // Delete local backup, start new one            //            backupFile.delete();            backupFile = newBackupFile();            backupStream = new FileOutputStream(backupFile);            bytesWrittenToBlock = 0;        }        /**         * Close down stream to remote datanode.         */        private synchronized void internalClose() throws IOException {            blockStream.writeLong(0);            blockStream.flush();            long complete = blockReplyStream.readLong();            if (complete != WRITE_COMPLETE) {                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);            }                                LocatedBlock lb = new LocatedBlock();            lb.readFields(blockReplyStream);            namenode.reportWrittenBlock(lb);            s.close();            s = null;        }        private void handleSocketException(IOException ie) throws IOException {          LOG.log(Level.WARNING, "Error while writing.", ie);          try {            if (s != null) {              s.close();              s = null;            }          } catch (IOException ie2) {            LOG.log(Level.WARNING, "Error closing socket.", ie2);          }          namenode.abandonBlock(block, src.toString());        }        /**         * Closes this output stream and releases any system          * resources associated with this stream.         */        public synchronized void close() throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            flush();            if (filePos == 0 || bytesWrittenToBlock != 0) {              try {                endBlock();              } catch (IOException e) {                namenode.abandonFileInProgress(src.toString());                throw e;              }            }            backupStream.close();            backupFile.delete();            if (s != null) {                s.close();                s = null;            }            super.close();            long localstart = System.currentTimeMillis();            boolean fileComplete = false;            while (! fileComplete) {                fileComplete = namenode.complete(src.toString(), clientName.toString());                if (!fileComplete) {                    try {                        Thread.sleep(400);                        if (System.currentTimeMillis() - localstart > 5000) {                            LOG.info("Could not complete file, retrying...");                        }                    } catch (InterruptedException ie) {                    }                }            }            closed = true;        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?