⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dfsclient.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                break;            }        }        if (chosenNode == null) {            do {                chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];            } while (deadNodes.contains(chosenNode));        }        return chosenNode;    }    /***************************************************************     * Periodically check in with the namenode and renew all the leases     * when the lease period is half over.     ***************************************************************/    class LeaseChecker implements Runnable {        /**         */        public void run() {            long lastRenewed = 0;            while (running) {                if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) {                    try {                      if( pendingCreates.size() > 0 )                        namenode.renewLease(clientName);                      lastRenewed = System.currentTimeMillis();                    } catch (IOException ie) {                      String err = StringUtils.stringifyException(ie);                      LOG.warn("Problem renewing lease for " + clientName +                                  ": " + err);                    }                }                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                }            }        }    }    /** Utility class to encapsulate data node info and its ip address. */    private static class DNAddrPair {      DatanodeInfo info;      InetSocketAddress addr;      DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {        this.info = info;        this.addr = addr;      }    }            /****************************************************************     * DFSInputStream provides bytes from a named file.  It handles      * negotiation of the namenode and various datanodes as necessary.     ****************************************************************/    class DFSInputStream extends FSInputStream {        private Socket s = null;        boolean closed = false;        private String src;        private DataInputStream blockStream;        private Block blocks[] = null;        private DatanodeInfo nodes[][] = null;        private long pos = 0;        private long filelen = 0;        private long blockEnd = -1;        /**         */        public DFSInputStream(String src) throws IOException {            this.src = src;            openInfo();            this.blockStream = null;            for (int i = 0; i < blocks.length; i++) {                this.filelen += blocks[i].getNumBytes();            }        }        /**         * Grab the open-file info from namenode         */        synchronized void openInfo() throws IOException {            Block oldBlocks[] = this.blocks;            LocatedBlock results[] = namenode.open(src);                        Vector blockV = new Vector();            Vector nodeV = new Vector();            for (int i = 0; i < results.length; i++) {                blockV.add(results[i].getBlock());                nodeV.add(results[i].getLocations());            }            Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);            if (oldBlocks != null) {                for (int i = 0; i < oldBlocks.length; i++) {                    if (! oldBlocks[i].equals(newBlocks[i])) {                        throw new IOException("Blocklist for " + src + " has changed!");                    }                }                if (oldBlocks.length != newBlocks.length) {                    throw new IOException("Blocklist for " + src + " now has different length");                }            }            this.blocks = newBlocks;            this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);        }        /**         * Open a DataInputStream to a DataNode so that it can be read from.         * We get block ID and the IDs of the destinations at startup, from the namenode.         */        private synchronized void blockSeekTo(long target) throws IOException {            if (target >= filelen) {                throw new IOException("Attempted to read past end of file");            }            if (s != null) {                s.close();                s = null;            }            //            // Compute desired block            //            int targetBlock = -1;            long targetBlockStart = 0;            long targetBlockEnd = 0;            for (int i = 0; i < blocks.length; i++) {                long blocklen = blocks[i].getNumBytes();                targetBlockEnd = targetBlockStart + blocklen - 1;                if (target >= targetBlockStart && target <= targetBlockEnd) {                    targetBlock = i;                    break;                } else {                    targetBlockStart = targetBlockEnd + 1;                                    }            }            if (targetBlock < 0) {                throw new IOException("Impossible situation: could not find target position " + target);            }            long offsetIntoBlock = target - targetBlockStart;            //            // Connect to best DataNode for desired Block, with potential offset            //            int failures = 0;            TreeSet deadNodes = new TreeSet();            while (s == null) {                DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);                DatanodeInfo chosenNode = retval.info;                InetSocketAddress targetAddr = retval.addr;                            try {                    s = new Socket();                    s.connect(targetAddr, READ_TIMEOUT);                    s.setSoTimeout(READ_TIMEOUT);                    //                    // Xmit header info to datanode                    //                    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                    out.write(OP_READSKIP_BLOCK);                    blocks[targetBlock].write(out);                    out.writeLong(offsetIntoBlock);                    out.flush();                    //                    // Get bytes in block, set streams                    //                    DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));                    long curBlockSize = in.readLong();                    long amtSkipped = in.readLong();                    if (curBlockSize != blocks[targetBlock].len) {                        throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);                    }                    if (amtSkipped != offsetIntoBlock) {                        throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);                    }                    this.pos = target;                    this.blockEnd = targetBlockEnd;                    this.blockStream = in;                } catch (IOException ex) {                    // Put chosen node into dead list, continue                    LOG.info("Failed to connect to " + targetAddr + ":" + ex);                    deadNodes.add(chosenNode);                    if (s != null) {                        try {                            s.close();                        } catch (IOException iex) {                        }                                            }                    s = null;                }            }        }        /**         * Close it down!         */        public synchronized void close() throws IOException {            checkOpen();            if (closed) {                throw new IOException("Stream closed");            }            if (s != null) {                blockStream.close();                s.close();                s = null;            }            super.close();            closed = true;        }        /**         * Basic read()         */        public synchronized int read() throws IOException {            checkOpen();            if (closed) {                throw new IOException("Stream closed");            }            int result = -1;            if (pos < filelen) {                if (pos > blockEnd) {                    blockSeekTo(pos);                }                result = blockStream.read();                if (result >= 0) {                    pos++;                }            }            return result;        }        /**         * Read the entire buffer.         */        public synchronized int read(byte buf[], int off, int len) throws IOException {            checkOpen();            if (closed) {                throw new IOException("Stream closed");            }            if (pos < filelen) {              int retries = 2;              while (retries > 0) {                try {                  if (pos > blockEnd) {                      blockSeekTo(pos);                  }                  int realLen = Math.min(len, (int) (blockEnd - pos + 1));                  int result = blockStream.read(buf, off, realLen);                  if (result >= 0) {                      pos += result;                  }                  return result;                } catch (IOException e) {                  LOG.warn("DFS Read: " + StringUtils.stringifyException(e));                  blockEnd = -1;                  if (--retries == 0) {                    throw e;                  }                }              }            }            return -1;        }                private DNAddrPair chooseDataNode(int blockId, TreeSet deadNodes)        throws IOException {          int failures = 0;          while (true) {            try {              DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);              InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());              return new DNAddrPair(chosenNode, targetAddr);            } catch (IOException ie) {              String blockInfo =                  blocks[blockId]+" file="+src;              if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {                throw new IOException("Could not obtain block: " + blockInfo);              }              if (nodes[blockId] == null || nodes[blockId].length == 0) {                LOG.info("No node available for block: " + blockInfo);              }              LOG.info("Could not obtain block from any node:  " + ie);              try {                Thread.sleep(3000);              } catch (InterruptedException iex) {              }              deadNodes.clear();              openInfo();              failures++;              continue;            }          }        }                 private void fetchBlockByteRange(int blockId, long start,            long end, byte[] buf, int offset) throws IOException {          //          // Connect to best DataNode for desired Block, with potential offset          //          TreeSet deadNodes = new TreeSet();          Socket dn = null;          while (dn == null) {            DNAddrPair retval = chooseDataNode(blockId, deadNodes);            DatanodeInfo chosenNode = retval.info;            InetSocketAddress targetAddr = retval.addr;                        try {              dn = new Socket();              dn.connect(targetAddr, READ_TIMEOUT);              dn.setSoTimeout(READ_TIMEOUT);                            //              // Xmit header info to datanode              //              DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));              out.write(OP_READ_RANGE_BLOCK);              blocks[blockId].write(out);              out.writeLong(start);              out.writeLong(end);              out.flush();                            //              // Get bytes in block, set streams              //              DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));              long curBlockSize = in.readLong();              long actualStart = in.readLong();              long actualEnd = in.readLong();              if (curBlockSize != blocks[blockId].len) {                throw new IOException("Recorded block size is " +                    blocks[blockId].len + ", but datanode reports size of " +                    curBlockSize);              }              if ((actualStart != start) || (actualEnd != end)) {                throw new IOException("Asked for byte range  " + start +                    "-" + end + ", but only received range " + actualStart +                    "-" + actualEnd);              }              int nread = in.read(buf, offset, (int)(end - start + 1));            } catch (IOException ex) {              // Put chosen node into dead list, continue              LOG.info("Failed to connect to " + targetAddr + ":" + ex);              deadNodes.add(chosenNode);              if (dn != null) {                try {                  dn.close();                } catch (IOException iex) {                }              }              dn = null;            }          }        }                public int read(long position, byte[] buf, int off, int len)        throws IOException {          // sanity checks          checkOpen();          if (closed) {            throw new IOException("Stream closed");          }          if ((position < 0) || (position > filelen)) {            return -1;          }          int realLen = len;          if ((position + len) > filelen) {            realLen = (int)(filelen - position);          }          // determine the block and byte range within the block          // corresponding to position and realLen          int targetBlock = -1;          long targetStart = 0;          long targetEnd = 0;          for (int idx = 0; idx < blocks.length; idx++) {            long blocklen = blocks[idx].getNumBytes();            targetEnd = targetStart + blocklen - 1;            if (position >= targetStart && position <= targetEnd) {              targetBlock = idx;              targetStart = position - targetStart;              targetEnd = Math.min(blocklen, targetStart + realLen) - 1;              realLen = (int)(targetEnd - targetStart + 1);              break;            }            targetStart += blocklen;          }          if (targetBlock < 0) {            throw new IOException(                "Impossible situation: could not find target position "+                position);          }          fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);          return realLen;        }                /**         * Seek to a new arbitrary location         */        public synchronized void seek(long targetPos) throws IOException {            if (targetPos > filelen) {                throw new IOException("Cannot seek after EOF");            }            pos = targetPos;            blockEnd = -1;        }        /**         */        public synchronized long getPos() throws IOException {            return pos;        }        /**         */        public synchronized int available() throws IOException {            if (closed) {                throw new IOException("Stream closed");            }            return (int) (filelen - pos);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -