⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedwebdbreader.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.db;

import java.io.*;
import java.util.*;

import net.nutch.io.*;
import net.nutch.fs.*;
import net.nutch.util.*;
import net.nutch.pagedb.*;
import net.nutch.linkdb.*;

/**********************************************
 * The WebDBReader implements all the read-only
 * parts of accessing our web database.
 * All the writing ones can be found in WebDBWriter.
 *
 * @author Mike Cafarella
 **********************************************/
public class DistributedWebDBReader implements IWebDBReader {
    static final Page[] PAGE_RECORDS = new Page[0];
    static final Link[] LINK_RECORDS = new Link[0];

    // filenames
    static final String PAGES_BY_URL = "pagesByURL";
    static final String PAGES_BY_MD5 = "pagesByMD5";
    static final String LINKS_BY_URL = "linksByURL";
    static final String LINKS_BY_MD5 = "linksByMD5";

    static final String STATS_FILE = "stats";
    static final String META_FILE = "metainfo";

    // For different enumeration types
    static final EnumCall PAGE_ENUMS = new PageEnumCall();
    static final EnumCall PAGE_MD5_ENUMS = new PageByMD5EnumCall();
    static final EnumCall LINK_ENUMS = new LinkEnumCall();    

    // Utility array for Vector conversion
    static final DBSectionReader[] STATIC_SR_ARRAY = new DBSectionReader[0];

    // Structures for multi-file db structures
    File root, dbDir;
    File globalWriteLock;
    DBSectionReader pagesByURL[], pagesByMD5[], linksByURL[], linksByMD5[];
    long totalPages = 0, totalLinks = 0;
    int numMachines = 0;

    /**
     * Open a web db reader for the named directory.
     */    
    public DistributedWebDBReader(NutchFileSystem nfs, File root) throws IOException, FileNotFoundException {
        //
        // Get the current db from the given nutchfs.  It consists
        // of a bunch of directories full of files.  
        //
        this.root = root;
        this.dbDir = new File(new File(root, "standard"), "webdb");

        //
        // Wait until the webdb is complete, by waiting till a given
        // file exists.
        //
        File dirIsComplete = new File(dbDir, "dbIsComplete");
        while (! nfs.exists(dirIsComplete)) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ie) {
            }
        }

        //
        // Obtain non-exclusive lock on the webdb's globalWriteLock 
        // so writers don't move it out from under us.
        //

        // REMIND - mjc - I think the locking here is suspect.
        /**
        this.globalWriteLock = new File("standard", new File("globalWriteLock"));
        nfs.lock(globalWriteLock, false);
        **/

        //
        // Load in how many segments we can expect
        //
        File machineInfo = new File(new File(root, "standard"), "machineinfo");
        DataInputStream in = new DataInputStream(nfs.open(machineInfo));
        try {
            in.readByte();  // version
            this.numMachines = in.readInt();
        } finally {
            in.close();
        }

        // 
        // Find all the "section" subdirs.  Each section will contain 
        // one of the 4 tables we're after.  Create one DBSectionReader 
        // object for each table in each section.
        //
        Vector pagesByURL = new Vector(), pagesByMD5 = new Vector(), linksByMD5 = new Vector(), linksByURL = new Vector();
        for (int i = 0; i < numMachines; i++) {
            // The relevant NutchFiles for each part of this db section
            File sectionDir = new File(dbDir, "dbsection." + i);
            File pagesByURLFile = new File(sectionDir, PAGES_BY_URL);
            File pagesByMD5File = new File(sectionDir, PAGES_BY_MD5);
            File linksByURLFile = new File(sectionDir, LINKS_BY_URL);
            File linksByMD5File = new File(sectionDir, LINKS_BY_MD5);

            // Create DBSectionReader object for each subtype
            pagesByURL.add(new DBSectionReader(nfs, pagesByURLFile, new UTF8.Comparator()));
            pagesByMD5.add(new DBSectionReader(nfs, pagesByMD5File, new Page.Comparator()));
            linksByURL.add(new DBSectionReader(nfs, linksByURLFile, new Link.UrlComparator()));
            linksByMD5.add(new DBSectionReader(nfs, linksByMD5File, new Link.MD5Comparator()));

            // Load in the stats file for the section
            File sectionStats = new File(sectionDir, STATS_FILE);
            in = new DataInputStream(nfs.open(sectionStats));
            try {
                in.read(); // version
                this.totalPages += in.readLong();
                this.totalLinks += in.readLong();
            } finally {
                in.close();
            }
        }

        // Put lists into array form
        this.pagesByURL = (DBSectionReader[]) pagesByURL.toArray(STATIC_SR_ARRAY);
        this.pagesByMD5 = (DBSectionReader[]) pagesByMD5.toArray(STATIC_SR_ARRAY);
        this.linksByURL = (DBSectionReader[]) linksByURL.toArray(STATIC_SR_ARRAY);
        this.linksByMD5 = (DBSectionReader[]) linksByMD5.toArray(STATIC_SR_ARRAY);
    }

    /**
     * Shutdown
     */
    public void close() throws IOException {
        for (int i = 0; i < pagesByURL.length; i++) {
            pagesByURL[i].close();
            pagesByMD5[i].close();
            linksByURL[i].close();
            linksByMD5[i].close();
        }
    }

    /**
     * How many sections (machines) there are in this distributed db.
     */
    public int numMachines() {
        return numMachines;
    }

    /**
     * Return the number of pages we're dealing with.
     */
    public long numPages() {
        return totalPages;
    }

    /**
     * Return the number of links in our db.
     */
    public long numLinks() {
        return totalLinks;
    }

    /**
     * Get Page from the pagedb with the given URL.
     */
    public Page getPage(String url) throws IOException {
        Page result = null, target = new Page();
        UTF8 searchURL = new UTF8(url);

        // Don't do linear search.  Instead, jump to the
        // chunk that will have it.
        return pagesByURL[DBKeyDivision.findURLSection(url, numMachines)].getPage(searchURL, target);
    }

    /**
     * Get all the Pages according to their content hash.
     * Since items in the pagesByMD5 DBSectionReader array will 
     * be sorted by ascending blocks of the content hash, 
     * we know the results will come in sorted order.
     */
    public Page[] getPages(MD5Hash md5) throws IOException {
        Vector resultSet = pagesByMD5[DBKeyDivision.findMD5Section(md5, numMachines)].getPages(md5);
        Page resultArray[] = new Page[resultSet.size()];
        int i = 0;
        for (Enumeration e = resultSet.elements(); e.hasMoreElements(); i++) {
            resultArray[i] = (Page) e.nextElement();
        }
        return resultArray;
    }

    /**
     * Test whether a certain piece of content is in the
     * database, but don't bother returning the Page(s) itself.
     * We need to test every DBSectionReader in pagesByMD5 until
     * we reach the end, or find a positive.
     */
    public boolean pageExists(MD5Hash md5) throws IOException {
        return pagesByMD5[DBKeyDivision.findMD5Section(md5, numMachines)].pageExists(md5);
    }

    /**
     * Iterate through all the Pages, sorted by URL.
     * We need to enumerate all the Enumerations given 
     * to us via a call to pages() for each DBSectionReader.
     */
    public Enumeration pages() throws IOException {
        return new MetaEnumerator(pagesByURL, PAGE_ENUMS);
    }

    /**
     * Iterate through all the Pages, sorted by MD5.
     * We enumerate all the DBSectionReader Enumerations,
     * just as above.
     */
    public Enumeration pagesByMD5() throws IOException {
        return new MetaEnumerator(pagesByMD5, PAGE_MD5_ENUMS);
    }

    /**
     * Get all the hyperlinks that link TO the indicated URL.
     */     
    public Link[] getLinks(UTF8 url) throws IOException {
        Vector resultSet = linksByURL[DBKeyDivision.findURLSection(url.toString(), numMachines)].getLinks(url);
        Link resultArray[] = new Link[resultSet.size()];
        int i = 0;
        for (Enumeration e = resultSet.elements(); e.hasMoreElements(); ) {
            resultArray[i++] = (Link) e.nextElement();
        }
        return resultArray;
    }

    /**
     * Grab all the links from the given MD5 hash.
     */
    public Link[] getLinks(MD5Hash md5) throws IOException {
        Vector resultSet = linksByMD5[DBKeyDivision.findMD5Section(md5, numMachines)].getLinks(md5);
        Link resultArray[] = new Link[resultSet.size()];
        int i = 0;
        for (Enumeration e = resultSet.elements(); e.hasMoreElements(); ) {
            resultArray[i++] = (Link) e.nextElement();
        }
        return resultArray;
    }

    /**
     * Return all the links, by target URL
     */
    public Enumeration links() throws IOException {
        return new MetaEnumerator(linksByURL, LINK_ENUMS);
    }

    //

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -