⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datanode.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                    } catch (IOException out2close) {                    } finally {                      out2 = null;                      in2 = null;                    }                  }                }              }                            //              // Process incoming data, copy to disk and              // maybe to network.              //              boolean anotherChunk = len != 0;              byte buf[] = new byte[BUFFER_SIZE];                            while (anotherChunk) {                while (len > 0) {                  int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));                  if (bytesRead < 0) {                    throw new EOFException("EOF reading from "+s.toString());                  }                  if (bytesRead > 0) {                    try {                      out.write(buf, 0, bytesRead);                      myMetrics.wroteBytes(bytesRead);                    } catch (IOException iex) {                      if (iex.getMessage().startsWith("No space left on device")) {                    	  throw new DiskOutOfSpaceException("No space left on device");                      } else {                        shutdown();                        throw iex;                      }                    }                    if (out2 != null) {                      try {                        out2.write(buf, 0, bytesRead);                      } catch (IOException out2e) {                        LOG.info("Exception writing to mirror " + mirrorNode                             + "\n" + StringUtils.stringifyException(out2e));                        //                        // If stream-copy fails, continue                         // writing to disk.  We shouldn't                         // interrupt client write.                        //                        try {                          out2.close();                          in2.close();                        } catch (IOException out2close) {                        } finally {                          out2 = null;                          in2 = null;                        }                      }                    }                    len -= bytesRead;                  }                }                                if (encodingType == RUNLENGTH_ENCODING) {                  anotherChunk = false;                } else if (encodingType == CHUNKED_ENCODING) {                  len = in.readLong();                  if (out2 != null) {                    try {                      out2.writeLong(len);                    } catch (IOException ie) {                      LOG.info("Exception writing to mirror " + mirrorNode                           + "\n" + StringUtils.stringifyException(ie));                      try {                        out2.close();                        in2.close();                      } catch (IOException ie2) {                        // NOTHING                      } finally {                        out2 = null;                        in2 = null;                      }                    }                  }                  if (len == 0) {                    anotherChunk = false;                  }                }              }                            if (out2 != null) {                try {                  out2.flush();                  long complete = in2.readLong();                  if (complete != WRITE_COMPLETE) {                    LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);                  }                  LocatedBlock newLB = new LocatedBlock();                  newLB.readFields(in2);                  in2.close();                  out2.close();                  DatanodeInfo mirrorsSoFar[] = newLB.getLocations();                  for (int k = 0; k < mirrorsSoFar.length; k++) {                    mirrors.add(mirrorsSoFar[k]);                  }                } catch (IOException ie) {                  LOG.info("Exception writing to mirror " + mirrorNode                       + "\n" + StringUtils.stringifyException(ie));                  try {                    out2.close();                    in2.close();                  } catch (IOException ie2) {                    // NOTHING                  } finally {                    out2 = null;                    in2 = null;                  }                }              }              if (out2 == null) {                LOG.info("Received block " + b + " from " +                     s.getInetAddress());              } else {                LOG.info("Received block " + b + " from " +                     s.getInetAddress() +                     " and mirrored to " + mirrorTarget);              }            } finally {              try {                out.close();              } catch (IOException iex) {                shutdown();                throw iex;              }            }            data.finalizeBlock(b);            myMetrics.wroteBlocks(1);                        //             // Tell the namenode that we've received this block             // in full, if we've been asked to.  This is done            // during NameNode-directed block transfers, but not            // client writes.            //            if (shouldReportBlock) {              synchronized (receivedBlockList) {                receivedBlockList.add(b);                receivedBlockList.notifyAll();              }            }                        //            // Tell client job is done, and reply with            // the new LocatedBlock.            //            reply.writeLong(WRITE_COMPLETE);            mirrors.add(curTarget);            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));            newLB.write(reply);          } finally {            reply.close();          }        }    }    /**     * Used for transferring a block of data.  This class     * sends a piece of data to another DataNode.     */    class DataTransfer implements Runnable {        InetSocketAddress curTarget;        DatanodeInfo targets[];        Block b;        byte buf[];        /**         * Connect to the first item in the target list.  Pass along the          * entire target list, the block, and the data.         */        public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {            this.curTarget = createSocketAddr(targets[0].getName());            this.targets = targets;            this.b = b;            this.buf = new byte[BUFFER_SIZE];        }        /**         * Do the deed, write the bytes         */        public void run() {      xmitsInProgress++;            try {                Socket s = new Socket();                s.connect(curTarget, READ_TIMEOUT);                s.setSoTimeout(READ_TIMEOUT);                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                try {                    long filelen = data.getLength(b);                    DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b)));                    try {                        //                        // Header info                        //                        out.write(OP_WRITE_BLOCK);                        out.writeBoolean(true);                        b.write(out);                        out.writeInt(targets.length);                        for (int i = 0; i < targets.length; i++) {                            targets[i].write(out);                        }                        out.write(RUNLENGTH_ENCODING);                        out.writeLong(filelen);                        //                        // Write the data                        //                        while (filelen > 0) {                            int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length));                            out.write(buf, 0, bytesRead);                            filelen -= bytesRead;                        }                    } finally {                        in.close();                    }                } finally {                    out.close();                }                LOG.info("Transmitted block " + b + " to " + curTarget);            } catch (IOException ie) {              LOG.warn("Failed to transfer "+b+" to "+curTarget, ie);            } finally {    xmitsInProgress--;      }        }    }    /**     * No matter what kind of exception we get, keep retrying to offerService().     * That's the loop that connects to the NameNode and provides basic DataNode     * functionality.     *     * Only stop when "shouldRun" is turned off (which can only happen at shutdown).     */    public void run() {        LOG.info("Starting DataNode in: "+data);                // start dataXceiveServer        dataXceiveServer.start();                while (shouldRun) {            try {                offerService();            } catch (Exception ex) {              LOG.error("Exception: " + StringUtils.stringifyException(ex));              if (shouldRun) {                try {                  Thread.sleep(5000);                } catch (InterruptedException ie) {                }              }            }        }                // wait for dataXceiveServer to terminate        try {            this.dataXceiveServer.join();        } catch (InterruptedException ie) {        }                LOG.info("Finishing DataNode in: "+data);    }    private static ArrayList dataNodeList = new ArrayList();    private static ArrayList dataNodeThreadList = new ArrayList();        /** Start datanode daemon.     */    public static void run(Configuration conf) throws IOException {        String[] dataDirs = conf.getStrings("dfs.data.dir");        DataNode dn = makeInstance(dataDirs, conf);        dataNodeList.add(dn);        if (dn != null) {          Thread t = new Thread(dn, "DataNode: [" +              StringUtils.arrayToString(dataDirs) + "]");          t.setDaemon(true); // needed for JUnit testing          t.start();          dataNodeThreadList.add(t);        }    }        /**     * Shut down all datanodes that where started via the run(conf) method.     * Returns only after shutdown is complete.     */    public static void shutdownAll(){      if(!dataNodeList.isEmpty()){        for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {          DataNode dataNode = (DataNode) iterator.next();          dataNode.shutdown();        }      }    }  /** Start a single datanode daemon and wait for it to finish.   *  If this thread is specifically interrupted, it will stop waiting.   */  private static void runAndWait(Configuration conf) throws IOException {    run(conf);    if (dataNodeThreadList.size() > 0) {      Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);      try {        t.join();      } catch (InterruptedException e) {        if (Thread.currentThread().isInterrupted()) {          // did someone knock?          return;        }      }    }  }  /**   * Make an instance of DataNode after ensuring that at least one of the   * given data directories (and their parent directories, if necessary)   * can be created.   * @param dataDirs List of directories, where the new DataNode instance should   * keep its files.   * @param conf Configuration instance to use.   * @return DataNode instance for given list of data dirs and conf, or null if   * no directory from this directory list can be created.   * @throws IOException   */  static DataNode makeInstance(String[] dataDirs, Configuration conf)  throws IOException {    ArrayList dirs = new ArrayList();    for (int i = 0; i < dataDirs.length; i++) {      File data = new File(dataDirs[i]);      try {        DiskChecker.checkDir( data );        dirs.add(dataDirs[i]);      } catch( DiskErrorException e ) {        LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );      }    }    return ((dirs.size() > 0) ? new DataNode(conf, dataDirs) : null);  }  public String toString() {    return "DataNode{" +        "data=" + data +        ", localName='" + dnRegistration.getName() + "'" +        ", storageID='" + dnRegistration.getStorageID() + "'" +        ", xmitsInProgress=" + xmitsInProgress +        "}";  }    /**     */    public static void main(String args[]) throws IOException {        Configuration conf = new Configuration();        runAndWait(conf);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -