⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datanode.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                    break;                  } else {                    if (xferTargets[i].length > 0) {                        LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);                        new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();                    }                  }                }              } else if (cmd.invalidateBlocks()) {                //                // Some local block(s) are obsolete and can be                 // safely garbage-collected.                //                Block toDelete[] = cmd.getBlocks();                data.invalidate(toDelete);                myMetrics.removedBlocks(toDelete.length);              } else if( cmd.shutdownNode()) {                // shut down the data node                this.shutdown();                continue;              }            }          }                      // send block report          if (now - lastBlockReport > blockReportInterval) {            // before send block report, check if data directory is healthy            data.checkDataDir();                            //            // Send latest blockinfo report if timer has expired.            // Get back a list of local block(s) that are obsolete            // and can be safely GC'ed.            //            Block toDelete[] = namenode.blockReport(dnRegistration,                                                    data.getBlockReport());            data.invalidate(toDelete);            lastBlockReport = now;            continue;          }                      // check if there are newly received blocks          Block [] blockArray=null;          synchronized( receivedBlockList ) {            if (receivedBlockList.size() > 0) {              //              // Send newly-received blockids to namenode              //              blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);            }          }          if( blockArray != null ) {            namenode.blockReceived( dnRegistration, blockArray );            synchronized (receivedBlockList) {              for(Block b: blockArray) {                receivedBlockList.remove(b);              }            }          }                      //          // There is no work to do;  sleep until hearbeat timer elapses,           // or work arrives, and then iterate again.          //          long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);          synchronized( receivedBlockList ) {            if (waitTime > 0 && receivedBlockList.size() == 0) {              try {                receivedBlockList.wait(waitTime);              } catch (InterruptedException ie) {              }            }          } // synchronized        } catch(DiskErrorException e) {          handleDiskError(e.getLocalizedMessage());          return;        } catch( RemoteException re ) {          String reClass = re.getClassName();          if( UnregisteredDatanodeException.class.getName().equals( reClass )) {            LOG.warn( "DataNode is shutting down: " +                       StringUtils.stringifyException(re));            shutdown();            return;          }          LOG.warn(StringUtils.stringifyException(re));        } catch (IOException e) {          LOG.warn(StringUtils.stringifyException(e));        }      } // while (shouldRun)    } // offerService            /**     * Server used for receiving/sending a block of data.     * This is created to listen for requests from clients or      * other DataNodes.  This small server does not use the      * Hadoop IPC mechanism.     */    class DataXceiveServer implements Runnable {        boolean shouldListen = true;        ServerSocket ss;        public DataXceiveServer(ServerSocket ss) {            this.ss = ss;        }        /**         */        public void run() {            try {                while (shouldListen) {                    Socket s = ss.accept();                    //s.setSoTimeout(READ_TIMEOUT);                    data.checkDataDir();                    xceiverCount.incr();                    new Daemon(new DataXceiver(s)).start();                }                ss.close();            } catch (DiskErrorException de ) {                String errMsgr = de.getMessage();                LOG.warn("Exiting DataXceiveServer due to "+ errMsgr );                handleDiskError(errMsgr);            } catch (IOException ie) {                LOG.info("Exiting DataXceiveServer due to " + ie.toString());            }        }        public void kill() {            this.shouldListen = false;            try {                this.ss.close();            } catch (IOException iex) {            }        }    }    /**     * Thread for processing incoming/outgoing data stream     */    class DataXceiver implements Runnable {        Socket s;        public DataXceiver(Socket s) {            this.s = s;            LOG.debug("Number of active connections is: "+xceiverCount);        }        /**         * Read/write data from/to the DataXceiveServer.         */        public void run() {            try {                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));                try {                    byte op = (byte) in.read();                    if (op == OP_WRITE_BLOCK) {                        writeBlock(in);                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK ||                        op == OP_READ_RANGE_BLOCK) {                        readBlock(in, op);                    } else {                        while (op >= 0) {                            System.out.println("Faulty op: " + op);                            op = (byte) in.read();                        }                        throw new IOException("Unknown opcode for incoming data stream");                    }                } finally {                    in.close();                }            } catch (IOException ie) {              LOG.warn("DataXCeiver", ie);            } finally {                try {                    xceiverCount.decr();                    LOG.debug("Number of active connections is: "+xceiverCount);                    s.close();                } catch (IOException ie2) {                }            }        }        /**         * Read a block from the disk         * @param in The stream to read from         * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK         * @throws IOException         */        private void readBlock(DataInputStream in, byte op) throws IOException {          //          // Read in the header          //          Block b = new Block();          b.readFields(in);          long toSkip = 0;          long endOffset = -1;          if (op == OP_READSKIP_BLOCK) {              toSkip = in.readLong();          } else if (op == OP_READ_RANGE_BLOCK) {            toSkip = in.readLong();            endOffset = in.readLong();          }          //          // Open reply stream          //          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));          try {              //              // Write filelen of -1 if error              //              if (! data.isValidBlock(b)) {                  out.writeLong(-1);              } else {                  //                  // Get blockdata from disk                  //                  long len = data.getLength(b);                  if (endOffset < 0) { endOffset = len; }                  DataInputStream in2 = new DataInputStream(data.getBlockData(b));                  out.writeLong(len);                  long amtSkipped = 0;                  if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) {                      if (toSkip > len) {                          toSkip = len;                      }                      try {                          amtSkipped = in2.skip(toSkip);                      } catch (IOException iex) {                          shutdown();                          throw iex;                      }                      out.writeLong(amtSkipped);                  }                  if (op == OP_READ_RANGE_BLOCK) {                      if (endOffset > len) {                        endOffset = len;                      }                      out.writeLong(endOffset);                  }                  byte buf[] = new byte[BUFFER_SIZE];                  try {                    int toRead = (int) (endOffset - amtSkipped + 1);                      int bytesRead = 0;                      try {                          bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));                          myMetrics.readBytes(bytesRead);                      } catch (IOException iex) {                          shutdown();                          throw iex;                      }                      while (toRead > 0 && bytesRead >= 0) {                          out.write(buf, 0, bytesRead);                          toRead -= bytesRead;                          if (toRead > 0) {                          try {                              bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));                              myMetrics.readBytes(bytesRead);                          } catch (IOException iex) {                              shutdown();                              throw iex;                          }                          }                      }                  } catch (SocketException se) {                      // This might be because the reader                      // closed the stream early                  } finally {                      try {                          in2.close();                      } catch (IOException iex) {                          shutdown();                          throw iex;                      }                  }              }              myMetrics.readBlocks(1);              LOG.info("Served block " + b + " to " + s.getInetAddress());          } finally {              out.close();          }        }        /**         * Write a block to disk.         * @param in The stream to read from         * @throws IOException         */        private void writeBlock(DataInputStream in) throws IOException {          //          // Read in the header          //          DataOutputStream reply =             new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));          try {            boolean shouldReportBlock = in.readBoolean();            Block b = new Block();            b.readFields(in);            int numTargets = in.readInt();            if (numTargets <= 0) {              throw new IOException("Mislabelled incoming datastream.");            }            DatanodeInfo targets[] = new DatanodeInfo[numTargets];            for (int i = 0; i < targets.length; i++) {              DatanodeInfo tmp = new DatanodeInfo();              tmp.readFields(in);              targets[i] = tmp;            }            byte encodingType = (byte) in.read();            long len = in.readLong();                        //            // Make sure curTarget is equal to this machine            //            DatanodeInfo curTarget = targets[0];                        //            // Track all the places we've successfully written the block            //            Vector mirrors = new Vector();                        //            // Open local disk out            //            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));            InetSocketAddress mirrorTarget = null;            String mirrorNode = null;            try {              //              // Open network conn to backup machine, if               // appropriate              //              DataInputStream in2 = null;              DataOutputStream out2 = null;              if (targets.length > 1) {                // Connect to backup machine                mirrorNode = targets[1].getName();                mirrorTarget = createSocketAddr(mirrorNode);                try {                  Socket s2 = new Socket();                  s2.connect(mirrorTarget, READ_TIMEOUT);                  s2.setSoTimeout(READ_TIMEOUT);                  out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));                  in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));                                    // Write connection header                  out2.write(OP_WRITE_BLOCK);                  out2.writeBoolean(shouldReportBlock);                  b.write(out2);                  out2.writeInt(targets.length - 1);                  for (int i = 1; i < targets.length; i++) {                    targets[i].write(out2);                  }                  out2.write(encodingType);                  out2.writeLong(len);                  myMetrics.replicatedBlocks(1);                } catch (IOException ie) {                  if (out2 != null) {                    LOG.info("Exception connecting to mirror " + mirrorNode                              + "\n" + StringUtils.stringifyException(ie));                    try {                      out2.close();                      in2.close();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -