datanode.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 788 行 · 第 1/3 页

JAVA
788
字号
                        // Open reply stream                        //                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                        try {                            //                            // Write filelen of -1 if error                            //                            if (! data.isValidBlock(b)) {                                out.writeLong(-1);                            } else {                                //                                // Get blockdata from disk                                //                                long len = data.getLength(b);                                DataInputStream in2 = new DataInputStream(data.getBlockData(b));                                out.writeLong(len);                                if (op == OP_READSKIP_BLOCK) {                                    if (toSkip > len) {                                        toSkip = len;                                    }                                    long amtSkipped = 0;                                    try {                                        amtSkipped = in2.skip(toSkip);                                    } catch (IOException iex) {                                        shutdown();                                        throw iex;                                    }                                    out.writeLong(amtSkipped);                                }                                byte buf[] = new byte[BUFFER_SIZE];                                try {                                    int bytesRead = 0;                                    try {                                        bytesRead = in2.read(buf);                                    } catch (IOException iex) {                                        shutdown();                                        throw iex;                                    }                                    while (bytesRead >= 0) {                                        out.write(buf, 0, bytesRead);                                        len -= bytesRead;                                        try {                                            bytesRead = in2.read(buf);                                        } catch (IOException iex) {                                            shutdown();                                            throw iex;                                        }                                    }                                } catch (SocketException se) {                                    // This might be because the reader                                    // closed the stream early                                } finally {                                    try {                                        in2.close();                                    } catch (IOException iex) {                                        shutdown();                                        throw iex;                                    }                                }                            }                            LOG.info("Served block " + b + " to " + s.getInetAddress());                        } finally {                            out.close();                        }                    } else {                        while (op >= 0) {                            System.out.println("Faulty op: " + op);                            op = (byte) in.read();                        }                        throw new IOException("Unknown opcode for incoming data stream");                    }                } finally {                    in.close();                }            } catch (IOException ie) {              LOG.log(Level.WARNING, "DataXCeiver", ie);            } finally {                try {                    s.close();                } catch (IOException ie2) {                }            }        }    }    /**     * Used for transferring a block of data.  This class     * sends a piece of data to another DataNode.     */    class DataTransfer implements Runnable {        InetSocketAddress curTarget;        DatanodeInfo targets[];        Block b;        byte buf[];        /**         * Connect to the first item in the target list.  Pass along the          * entire target list, the block, and the data.         */        public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {            this.curTarget = createSocketAddr(targets[0].getName().toString());            this.targets = targets;            this.b = b;            this.buf = new byte[BUFFER_SIZE];        }        /**         * Do the deed, write the bytes         */        public void run() {	    xmitsInProgress++;            try {                Socket s = new Socket();                s.connect(curTarget, READ_TIMEOUT);                s.setSoTimeout(READ_TIMEOUT);                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                try {                    long filelen = data.getLength(b);                    DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b)));                    try {                        //                        // Header info                        //                        out.write(OP_WRITE_BLOCK);                        out.writeBoolean(true);                        b.write(out);                        out.writeInt(targets.length);                        for (int i = 0; i < targets.length; i++) {                            targets[i].write(out);                        }                        out.write(RUNLENGTH_ENCODING);                        out.writeLong(filelen);                        //                        // Write the data                        //                        while (filelen > 0) {                            int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length));                            out.write(buf, 0, bytesRead);                            filelen -= bytesRead;                        }                    } finally {                        in.close();                    }                } finally {                    out.close();                }                LOG.info("Transmitted block " + b + " to " + curTarget);            } catch (IOException ie) {              LOG.log(Level.WARNING, "Failed to transfer "+b+" to "+curTarget, ie);            } finally {		xmitsInProgress--;	    }        }    }    /**     * No matter what kind of exception we get, keep retrying to offerService().     * That's the loop that connects to the NameNode and provides basic DataNode     * functionality.     *     * Only stop when "shouldRun" is turned off (which can only happen at shutdown).     */    public void run() {        LOG.info("Starting DataNode in: "+data.data);        while (shouldRun) {            try {                offerService();            } catch (Exception ex) {                LOG.info("Exception: " + ex);              if (shouldRun) {                LOG.info("Lost connection to namenode.  Retrying...");                try {                  Thread.sleep(5000);                } catch (InterruptedException ie) {                }              }            }        }      LOG.info("Finishing DataNode in: "+data.data);    }    /** Start datanode daemons.     * Start a datanode daemon for each comma separated data directory     * specified in property dfs.data.dir     */    public static void run(Configuration conf) throws IOException {        String[] dataDirs = conf.getStrings("dfs.data.dir");        subThreadList = new Vector(dataDirs.length);        for (int i = 0; i < dataDirs.length; i++) {          DataNode dn = makeInstanceForDir(dataDirs[i], conf);          if (dn != null) {            Thread t = new Thread(dn, "DataNode: "+dataDirs[i]);            t.setDaemon(true); // needed for JUnit testing            t.start();            subThreadList.add(t);          }        }    }  /** Start datanode daemons.   * Start a datanode daemon for each comma separated data directory   * specified in property dfs.data.dir and wait for them to finish.   * If this thread is specifically interrupted, it will stop waiting.   */  private static void runAndWait(Configuration conf) throws IOException {    run(conf);    //  Wait for sub threads to exit    for (Iterator iterator = subThreadList.iterator(); iterator.hasNext();) {      Thread threadDataNode = (Thread) iterator.next();      try {        threadDataNode.join();      } catch (InterruptedException e) {        if (Thread.currentThread().isInterrupted()) {          // did someone knock?          return;        }      }    }  }  /**   * Make an instance of DataNode after ensuring that given data directory   * (and parent directories, if necessary) can be created.   * @param dataDir where the new DataNode instance should keep its files.   * @param conf Configuration instance to use.   * @return DataNode instance for given data dir and conf, or null if directory   * cannot be created.   * @throws IOException   */  static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException {    DataNode dn = null;    File data = new File(dataDir);    data.mkdirs();    if (!data.isDirectory()) {      LOG.warning("Can't start DataNode in non-directory: "+dataDir);      return null;    } else {      dn = new DataNode(conf, dataDir);    }    return dn;  }  public String toString() {    return "DataNode{" +        "data=" + data +        ", localName='" + localName + "'" +        ", xmitsInProgress=" + xmitsInProgress +        "}";  }    /**     */    public static void main(String args[]) throws IOException {        LogFormatter.setShowThreadIDs(true);        runAndWait(new Configuration());    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?