⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fsnamesystem.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    /**     * Release the lock on the given file     */    public synchronized int releaseLock(UTF8 src, UTF8 holder) {        int result = internalReleaseLock(src, holder);        if (result == COMPLETE_SUCCESS) {            synchronized (leases) {                Lease lease = (Lease) leases.get(holder);                if (lease != null) {                    lease.released(src);                    if (! lease.hasLocks()) {                        leases.remove(holder);                        sortedLeases.remove(lease);                    }                }            }        }        return result;    }    private int internalReleaseLock(UTF8 src, UTF8 holder) {        return dir.releaseLock(src, holder);    }    /**     * Release a pending file creation lock.     * @param src The filename     * @param holder The datanode that was creating the file     */    private void internalReleaseCreate(UTF8 src, UTF8 holder) {      FileUnderConstruction v =         (FileUnderConstruction) pendingCreates.remove(src);      if (v != null) {         NameNode.stateChangeLog.debug(                      "DIR* NameSystem.internalReleaseCreate: " + src                    + " is removed from pendingCreates for "                    + holder + " (failure)");        for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) {          Block b = (Block) it2.next();          pendingCreateBlocks.remove(b);        }      } else {          NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "                 + "attempt to release a create lock on "+ src.toString()                 + " that was not in pedingCreates");      }    }    /**     * Renew the lease(s) held by the given client     */    public void renewLease(UTF8 holder) throws IOException {        synchronized (leases) {            if( isInSafeMode() )              throw new SafeModeException( "Cannot renew lease for " + holder, safeMode );            Lease lease = (Lease) leases.get(holder);            if (lease != null) {                sortedLeases.remove(lease);                lease.renew();                sortedLeases.add(lease);            }        }    }    /**     * Get a listing of all files at 'src'.  The Object[] array     * exists so we can return file attributes (soon to be implemented)     */    public DFSFileInfo[] getListing(UTF8 src) {        return dir.getListing(src);    }    /////////////////////////////////////////////////////////    //    // These methods are called by datanodes    //    /////////////////////////////////////////////////////////    /**     * Register Datanode.     * <p>     * The purpose of registration is to identify whether the new datanode     * serves a new data storage, and will report new data block copies,     * which the namenode was not aware of; or the datanode is a replacement     * node for the data storage that was previously served by a different     * or the same (in terms of host:port) datanode.     * The data storages are distinguished by their storageIDs. When a new     * data storage is reported the namenode issues a new unique storageID.     * <p>     * Finally, the namenode returns its namespaceID as the registrationID     * for the datanodes.      * namespaceID is a persistent attribute of the name space.     * The registrationID is checked every time the datanode is communicating     * with the namenode.      * Datanodes with inappropriate registrationID are rejected.     * If the namenode stops, and then restarts it can restore its      * namespaceID and will continue serving the datanodes that has previously     * registered with the namenode without restarting the whole cluster.     *      * @see DataNode#register()     * @author Konstantin Shvachko     */    public synchronized void registerDatanode( DatanodeRegistration nodeReg                                               ) throws IOException {      NameNode.stateChangeLog.debug(          "BLOCK* NameSystem.registerDatanode: "          + "node registration from " + nodeReg.getName()          + " storage " + nodeReg.getStorageID() );      nodeReg.registrationID = getRegistrationID();      DatanodeDescriptor nodeS = (DatanodeDescriptor)datanodeMap.get(nodeReg.getStorageID());      DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() );            if( nodeN != null && nodeS != null && nodeN == nodeS ) {        // The same datanode has been just restarted to serve the same data         // storage. We do not need to remove old data blocks, the delta will          // be calculated on the next block report from the datanode        NameNode.stateChangeLog.debug(            "BLOCK* NameSystem.registerDatanode: "            + "node restarted." );        return;      }            if( nodeN != null ) {        // nodeN previously served a different data storage,         // which is not served by anybody anymore.        removeDatanode( nodeN );        // physically remove node from datanodeMap        wipeDatanode( nodeN );        // and log removal        getEditLog().logRemoveDatanode( nodeN );        nodeN = null;      }            // nodeN is not found      if( nodeS != null ) {        // nodeS is found        // The registering datanode is a replacement node for the existing         // data storage, which from now on will be served by a new node.        NameNode.stateChangeLog.debug(            "BLOCK* NameSystem.registerDatanode: "            + "node " + nodeS.name            + " is replaced by " + nodeReg.getName() + "." );        getEditLog().logRemoveDatanode( nodeS );        nodeS.name = nodeReg.getName();        getEditLog().logAddDatanode( nodeS );        return;      }      // this is a new datanode serving a new data storage      if( nodeReg.getStorageID().equals("") ) {        // this data storage has never been registered        // it is either empty or was created by pre-storageID version of DFS        nodeReg.storageID = newStorageID();        NameNode.stateChangeLog.debug(            "BLOCK* NameSystem.registerDatanode: "            + "new storageID " + nodeReg.getStorageID() + " assigned." );      }      // register new datanode      DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg );      unprotectedAddDatanode( nodeDescr );      getEditLog().logAddDatanode( nodeDescr );      return;    }        /**     * Get registrationID for datanodes based on the namespaceID.     *      * @see #registerDatanode(DatanodeRegistration)     * @see FSImage#newNamespaceID()     * @return registration ID     */    public String getRegistrationID() {      return "NS" + Integer.toString( dir.namespaceID );    }        /**     * Generate new storage ID.     *      * @return unique storage ID     *      * Note: that collisions are still possible if somebody will try      * to bring in a data storage from a different cluster.     */    private String newStorageID() {      String newID = null;      while( newID == null ) {        newID = "DS" + Integer.toString( r.nextInt() );        if( datanodeMap.get( newID ) != null )          newID = null;      }      return newID;    }        /**     * The given node has reported in.  This method should:     * 1) Record the heartbeat, so the datanode isn't timed out     * 2) Adjust usage stats for future block allocation     */    public synchronized void gotHeartbeat(DatanodeID nodeID,                                          long capacity,                                           long remaining,                                          int xceiverCount) throws IOException {      synchronized (heartbeats) {        synchronized (datanodeMap) {          DatanodeDescriptor nodeinfo = getDatanode( nodeID );                    if (nodeinfo == null)             // We do not accept unregistered guests            throw new UnregisteredDatanodeException( nodeID );          removeHeartbeat(nodeinfo);          nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);          addHeartbeat(nodeinfo);        }      }    }    /**     * Periodically calls heartbeatCheck().     */    class HeartbeatMonitor implements Runnable {        /**         */        public void run() {            while (fsRunning) {                heartbeatCheck();                try {                    Thread.sleep(heartBeatRecheck);                } catch (InterruptedException ie) {                }            }        }    }    /**     * remove a datanode descriptor     * @param nodeID datanode ID     * @author hairong     */    synchronized public void removeDatanode( DatanodeID nodeID )     throws IOException {      DatanodeDescriptor nodeInfo = getDatanode( nodeID );      if (nodeInfo != null) {        removeDatanode( nodeInfo );      } else {          NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "                  + nodeInfo.getName() + " does not exist");      }  }    /**   * remove a datanode descriptor   * @param nodeInfo datanode descriptor   * @author hairong   */    private void removeDatanode( DatanodeDescriptor nodeInfo ) {      removeHeartbeat(nodeInfo);      Block deadblocks[] = nodeInfo.getBlocks();      if( deadblocks != null )        for( int i = 0; i < deadblocks.length; i++ )          removeStoredBlock(deadblocks[i], nodeInfo);      unprotectedRemoveDatanode(nodeInfo);    }    void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {      // datanodeMap.remove(nodeDescr.getStorageID());      // deaddatanodeMap.put(nodeDescr.getName(), nodeDescr);      nodeDescr.resetBlocks();      NameNode.stateChangeLog.debug(          "BLOCK* NameSystem.unprotectedRemoveDatanode: "          + nodeDescr.getName() + " is out of service now.");    }        void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {      datanodeMap.put( nodeDescr.getStorageID(), nodeDescr );      NameNode.stateChangeLog.debug(          "BLOCK* NameSystem.unprotectedAddDatanode: "          + "node " + nodeDescr.getName() + " is added to datanodeMap." );    }    private void addHeartbeat( DatanodeDescriptor nodeDescr ) {      heartbeats.add(nodeDescr);      totalCapacity += nodeDescr.capacity;      totalRemaining += nodeDescr.remaining;    }        private void removeHeartbeat( DatanodeDescriptor nodeDescr ) {      totalCapacity -= nodeDescr.getCapacity();      totalRemaining -= nodeDescr.getRemaining();      heartbeats.remove(nodeDescr);    }        /**     * Physically remove node from datanodeMap.     *      * @param nodeID node     */    void wipeDatanode( DatanodeID nodeID ) {      datanodeMap.remove(nodeID.getStorageID());      NameNode.stateChangeLog.debug(          "BLOCK* NameSystem.wipeDatanode: "          + nodeID.getName() + " storage " + nodeID.getStorageID()           + " is removed from datanodeMap.");    }        private FSEditLog getEditLog() {      return dir.fsImage.getEditLog();    }    /**     * Check if there are any expired heartbeats, and if so,     * whether any blocks have to be re-replicated.     */    synchronized void heartbeatCheck() {      synchronized (heartbeats) {        DatanodeDescriptor nodeInfo = null;        while ((heartbeats.size() > 0) &&               ((nodeInfo = (DatanodeDescriptor) heartbeats.first()) != null) &&               (nodeInfo.isDead())) {          NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "              + "lost heartbeat from " + nodeInfo.getName());          removeDatanode( nodeInfo );        }      }    }        /**     * The given node is reporting all its blocks.  Use this info to      * update the (machine-->blocklist) and (block-->machinelist) tables.     */    public synchronized Block[] processReport(DatanodeID nodeID,                                               Block newReport[]                                            ) throws IOException {        NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "          +"from "+nodeID.getName()+" "+newReport.length+" blocks" );        DatanodeDescriptor node = getDatanode( nodeID );        //        // Modify the (block-->datanode) map, according to the difference        // between the old and new block report.        //        int oldPos = 0, newPos = 0;        Block oldReport[] = node.getBlocks();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -