⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dfsclient.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        /**         * We definitely don't support marks         */        public boolean markSupported() {            return false;        }        public void mark(int readLimit) {        }        public void reset() throws IOException {            throw new IOException("Mark not supported");        }    }    /****************************************************************     * DFSOutputStream creates files from a stream of bytes.     ****************************************************************/    class DFSOutputStream extends FSOutputStream {        private Socket s;        boolean closed = false;        private byte outBuf[] = new byte[BUFFER_SIZE];        private int pos = 0;        private UTF8 src;        private boolean overwrite;        private short replication;        private boolean firstTime = true;        private DataOutputStream blockStream;        private DataInputStream blockReplyStream;        private File backupFile;        private OutputStream backupStream;        private Block block;        private long filePos = 0;        private int bytesWrittenToBlock = 0;        private String datanodeName;        private long blockSize;        private Progressable progress;        /**         * Create a new output stream to the given DataNode.         */        public DFSOutputStream(UTF8 src, boolean overwrite,                                short replication, long blockSize,                               Progressable progress                               ) throws IOException {            this.src = src;            this.overwrite = overwrite;            this.replication = replication;            this.backupFile = newBackupFile();            this.blockSize = blockSize;            this.backupStream = new FileOutputStream(backupFile);            this.progress = progress;            if (progress != null) {                LOG.debug("Set non-null progress callback on DFSOutputStream "+src);            }        }        private File newBackupFile() throws IOException {          File result = conf.getFile("dfs.client.buffer.dir",                                     "tmp"+File.separator+                                     "client-"+Math.abs(r.nextLong()));          result.deleteOnExit();          return result;        }        /**         * Open a DataOutputStream to a DataNode so that it can be written to.         * This happens when a file is created and each time a new block is allocated.         * Must get block ID and the IDs of the destinations from the namenode.         */        private synchronized void nextBlockOutputStream() throws IOException {            boolean retry = false;            long startTime = System.currentTimeMillis();            do {                retry = false;                                LocatedBlock lb;                if (firstTime) {                  lb = locateNewBlock();                } else {                  lb = locateFollowingBlock(startTime);                }                block = lb.getBlock();                DatanodeInfo nodes[] = lb.getLocations();                //                // Connect to first DataNode in the list.  Abort if this fails.                //                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());                try {                    s = new Socket();                    s.connect(target, READ_TIMEOUT);                    s.setSoTimeout(replication * READ_TIMEOUT);                    datanodeName = nodes[0].getName();                } catch (IOException ie) {                    // Connection failed.  Let's wait a little bit and retry                    try {                        if (System.currentTimeMillis() - startTime > 5000) {                            LOG.info("Waiting to find target node: " + target);                        }                        Thread.sleep(6000);                    } catch (InterruptedException iex) {                    }                    if (firstTime) {                        namenode.abandonFileInProgress(src.toString(),                                                        clientName);                    } else {                        namenode.abandonBlock(block, src.toString());                    }                    retry = true;                    continue;                }                //                // Xmit header info to datanode                //                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                out.write(OP_WRITE_BLOCK);                out.writeBoolean(false);                block.write(out);                out.writeInt(nodes.length);                for (int i = 0; i < nodes.length; i++) {                    nodes[i].write(out);                }                out.write(CHUNKED_ENCODING);                bytesWrittenToBlock = 0;                blockStream = out;                blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));            } while (retry);            firstTime = false;        }        private LocatedBlock locateNewBlock() throws IOException {               int retries = 3;          while (true) {            while (true) {              try {                return namenode.create(src.toString(), clientName.toString(),                    localName, overwrite, replication, blockSize);              } catch (RemoteException e) {                if (--retries == 0 ||                     !AlreadyBeingCreatedException.class.getName().                        equals(e.getClassName())) {                  throw e;                } else {                  // because failed tasks take upto LEASE_PERIOD to                  // release their pendingCreates files, if the file                  // we want to create is already being created,                   // wait and try again.                  LOG.info(StringUtils.stringifyException(e));                  try {                    Thread.sleep(LEASE_PERIOD);                  } catch (InterruptedException ie) {                  }                }              }            }          }        }                private LocatedBlock locateFollowingBlock(long start                                                  ) throws IOException {               int retries = 5;          while (true) {            long localstart = System.currentTimeMillis();            while (true) {              try {                return namenode.addBlock(src.toString(),                                          clientName.toString());              } catch (RemoteException e) {                if (--retries == 0 ||                     !NotReplicatedYetException.class.getName().                        equals(e.getClassName())) {                  throw e;                } else {                  LOG.info(StringUtils.stringifyException(e));                  if (System.currentTimeMillis() - localstart > 5000) {                    LOG.info("Waiting for replication for " +                              (System.currentTimeMillis() - localstart)/1000 +                              " seconds");                  }                  try {                    Thread.sleep(400);                  } catch (InterruptedException ie) {                  }                }                              }            }          }         }        /**         * We're referring to the file pos here         */        public synchronized long getPos() throws IOException {            return filePos;        }			        /**         * Writes the specified byte to this output stream.         */        public synchronized void write(int b) throws IOException {            checkOpen();            if (closed) {                throw new IOException("Stream closed");            }            if ((bytesWrittenToBlock + pos == blockSize) ||                (pos >= BUFFER_SIZE)) {                flush();            }            outBuf[pos++] = (byte) b;            filePos++;        }        /**         * Writes the specified bytes to this output stream.         */      public synchronized void write(byte b[], int off, int len)        throws IOException {            checkOpen();            if (closed) {                throw new IOException("Stream closed");            }            while (len > 0) {              int remaining = BUFFER_SIZE - pos;              int toWrite = Math.min(remaining, len);              System.arraycopy(b, off, outBuf, pos, toWrite);              pos += toWrite;              off += toWrite;              len -= toWrite;              filePos += toWrite;              if ((bytesWrittenToBlock + pos >= blockSize) ||                  (pos == BUFFER_SIZE)) {                flush();              }            }        }        /**         * Flush the buffer, getting a stream to a new block if necessary.         */        public synchronized void flush() throws IOException {            checkOpen();            if (closed) {                throw new IOException("Stream closed");            }            if (bytesWrittenToBlock + pos >= blockSize) {                flushData((int) blockSize - bytesWrittenToBlock);            }            if (bytesWrittenToBlock == blockSize) {                endBlock();            }            flushData(pos);        }        /**         * Actually flush the accumulated bytes to the remote node,         * but no more bytes than the indicated number.         */        private synchronized void flushData(int maxPos) throws IOException {            int workingPos = Math.min(pos, maxPos);                        if (workingPos > 0) {                //                // To the local block backup, write just the bytes                //                backupStream.write(outBuf, 0, workingPos);                //                // Track position                //                bytesWrittenToBlock += workingPos;                System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);                pos -= workingPos;            }        }        /**         * We're done writing to the current block.         */        private synchronized void endBlock() throws IOException {            //            // Done with local copy            //            backupStream.close();            //            // Send it to datanode            //            boolean sentOk = false;            int remainingAttempts =                conf.getInt("dfs.client.block.write.retries", 3);            while (!sentOk) {                nextBlockOutputStream();                InputStream in = new FileInputStream(backupFile);                try {                    byte buf[] = new byte[BUFFER_SIZE];                    int bytesRead = in.read(buf);                    while (bytesRead > 0) {                        blockStream.writeLong((long) bytesRead);                        blockStream.write(buf, 0, bytesRead);                        if (progress != null) { progress.progress(); }                        bytesRead = in.read(buf);                    }                    internalClose();                    sentOk = true;                } catch (IOException ie) {                    handleSocketException(ie);                    remainingAttempts -= 1;                    if (remainingAttempts == 0) {                      throw ie;                    }                } finally {                  in.close();                }            }            //            // Delete local backup, start new one            //            backupFile.delete();            backupFile = newBackupFile();            backupStream = new FileOutputStream(backupFile);            bytesWrittenToBlock = 0;        }        /**         * Close down stream to remote datanode.         */        private synchronized void internalClose() throws IOException {          try {            blockStream.writeLong(0);            blockStream.flush();            long complete = blockReplyStream.readLong();            if (complete != WRITE_COMPLETE) {                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);            }          } catch (IOException ie) {            throw (IOException)                  new IOException("failure closing block of file " +                                  src.toString() + " to node " +                                  (datanodeName == null ? "?" : datanodeName)                                 ).initCause(ie);          }                                LocatedBlock lb = new LocatedBlock();            lb.readFields(blockReplyStream);            namenode.reportWrittenBlock(lb);            s.close();            s = null;        }        private void handleSocketException(IOException ie) throws IOException {          LOG.warn("Error while writing.", ie);          try {            if (s != null) {              s.close();              s = null;            }          } catch (IOException ie2) {            LOG.warn("Error closing socket.", ie2);          }          namenode.abandonBlock(block, src.toString());        }        /**         * Closes this output stream and releases any system          * resources associated with this stream.         */        public synchronized void close() throws IOException {          checkOpen();          if (closed) {              throw new IOException("Stream closed");          }                    try {            flush();            if (filePos == 0 || bytesWrittenToBlock != 0) {              try {                endBlock();              } catch (IOException e) {                namenode.abandonFileInProgress(src.toString(), clientName);                throw e;              }            }                        backupStream.close();            backupFile.delete();            if (s != null) {                s.close();                s = null;            }            super.close();            long localstart = System.currentTimeMillis();            boolean fileComplete = false;            while (! fileComplete) {              fileComplete = namenode.complete(src.toString(), clientName.toString());              if (!fileComplete) {                try {                  Thread.sleep(400);                  if (System.currentTimeMillis() - localstart > 5000) {                    LOG.info("Could not complete file, retrying...");                  }                } catch (InterruptedException ie) {                }              }            }            closed = true;          } finally {            synchronized (pendingCreates) {              pendingCreates.remove(src.toString());            }          }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -