📄 distributedwebdbreader.java
字号:
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.db;
import java.io.*;
import java.util.*;
import net.nutch.io.*;
import net.nutch.fs.*;
import net.nutch.util.*;
import net.nutch.pagedb.*;
import net.nutch.linkdb.*;
/**********************************************
* The WebDBReader implements all the read-only
* parts of accessing our web database.
* All the writing ones can be found in WebDBWriter.
*
* @author Mike Cafarella
**********************************************/
public class DistributedWebDBReader implements IWebDBReader {
static final Page[] PAGE_RECORDS = new Page[0];
static final Link[] LINK_RECORDS = new Link[0];
// filenames
static final String PAGES_BY_URL = "pagesByURL";
static final String PAGES_BY_MD5 = "pagesByMD5";
static final String LINKS_BY_URL = "linksByURL";
static final String LINKS_BY_MD5 = "linksByMD5";
static final String STATS_FILE = "stats";
static final String META_FILE = "metainfo";
// For different enumeration types
static final EnumCall PAGE_ENUMS = new PageEnumCall();
static final EnumCall PAGE_MD5_ENUMS = new PageByMD5EnumCall();
static final EnumCall LINK_ENUMS = new LinkEnumCall();
// Utility array for Vector conversion
static final DBSectionReader[] STATIC_SR_ARRAY = new DBSectionReader[0];
// Structures for multi-file db structures
File root, dbDir;
File globalWriteLock;
DBSectionReader pagesByURL[], pagesByMD5[], linksByURL[], linksByMD5[];
long totalPages = 0, totalLinks = 0;
int numMachines = 0;
/**
* Open a web db reader for the named directory.
*/
public DistributedWebDBReader(NutchFileSystem nfs, File root) throws IOException, FileNotFoundException {
//
// Get the current db from the given nutchfs. It consists
// of a bunch of directories full of files.
//
this.root = root;
this.dbDir = new File(new File(root, "standard"), "webdb");
//
// Wait until the webdb is complete, by waiting till a given
// file exists.
//
File dirIsComplete = new File(dbDir, "dbIsComplete");
while (! nfs.exists(dirIsComplete)) {
try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
}
}
//
// Obtain non-exclusive lock on the webdb's globalWriteLock
// so writers don't move it out from under us.
//
// REMIND - mjc - I think the locking here is suspect.
/**
this.globalWriteLock = new File("standard", new File("globalWriteLock"));
nfs.lock(globalWriteLock, false);
**/
//
// Load in how many segments we can expect
//
File machineInfo = new File(new File(root, "standard"), "machineinfo");
DataInputStream in = new DataInputStream(nfs.open(machineInfo));
try {
in.readByte(); // version
this.numMachines = in.readInt();
} finally {
in.close();
}
//
// Find all the "section" subdirs. Each section will contain
// one of the 4 tables we're after. Create one DBSectionReader
// object for each table in each section.
//
Vector pagesByURL = new Vector(), pagesByMD5 = new Vector(), linksByMD5 = new Vector(), linksByURL = new Vector();
for (int i = 0; i < numMachines; i++) {
// The relevant NutchFiles for each part of this db section
File sectionDir = new File(dbDir, "dbsection." + i);
File pagesByURLFile = new File(sectionDir, PAGES_BY_URL);
File pagesByMD5File = new File(sectionDir, PAGES_BY_MD5);
File linksByURLFile = new File(sectionDir, LINKS_BY_URL);
File linksByMD5File = new File(sectionDir, LINKS_BY_MD5);
// Create DBSectionReader object for each subtype
pagesByURL.add(new DBSectionReader(nfs, pagesByURLFile, new UTF8.Comparator()));
pagesByMD5.add(new DBSectionReader(nfs, pagesByMD5File, new Page.Comparator()));
linksByURL.add(new DBSectionReader(nfs, linksByURLFile, new Link.UrlComparator()));
linksByMD5.add(new DBSectionReader(nfs, linksByMD5File, new Link.MD5Comparator()));
// Load in the stats file for the section
File sectionStats = new File(sectionDir, STATS_FILE);
in = new DataInputStream(nfs.open(sectionStats));
try {
in.read(); // version
this.totalPages += in.readLong();
this.totalLinks += in.readLong();
} finally {
in.close();
}
}
// Put lists into array form
this.pagesByURL = (DBSectionReader[]) pagesByURL.toArray(STATIC_SR_ARRAY);
this.pagesByMD5 = (DBSectionReader[]) pagesByMD5.toArray(STATIC_SR_ARRAY);
this.linksByURL = (DBSectionReader[]) linksByURL.toArray(STATIC_SR_ARRAY);
this.linksByMD5 = (DBSectionReader[]) linksByMD5.toArray(STATIC_SR_ARRAY);
}
/**
* Shutdown
*/
public void close() throws IOException {
for (int i = 0; i < pagesByURL.length; i++) {
pagesByURL[i].close();
pagesByMD5[i].close();
linksByURL[i].close();
linksByMD5[i].close();
}
}
/**
* How many sections (machines) there are in this distributed db.
*/
public int numMachines() {
return numMachines;
}
/**
* Return the number of pages we're dealing with.
*/
public long numPages() {
return totalPages;
}
/**
* Return the number of links in our db.
*/
public long numLinks() {
return totalLinks;
}
/**
* Get Page from the pagedb with the given URL.
*/
public Page getPage(String url) throws IOException {
Page result = null, target = new Page();
UTF8 searchURL = new UTF8(url);
// Don't do linear search. Instead, jump to the
// chunk that will have it.
return pagesByURL[DBKeyDivision.findURLSection(url, numMachines)].getPage(searchURL, target);
}
/**
* Get all the Pages according to their content hash.
* Since items in the pagesByMD5 DBSectionReader array will
* be sorted by ascending blocks of the content hash,
* we know the results will come in sorted order.
*/
public Page[] getPages(MD5Hash md5) throws IOException {
Vector resultSet = pagesByMD5[DBKeyDivision.findMD5Section(md5, numMachines)].getPages(md5);
Page resultArray[] = new Page[resultSet.size()];
int i = 0;
for (Enumeration e = resultSet.elements(); e.hasMoreElements(); i++) {
resultArray[i] = (Page) e.nextElement();
}
return resultArray;
}
/**
* Test whether a certain piece of content is in the
* database, but don't bother returning the Page(s) itself.
* We need to test every DBSectionReader in pagesByMD5 until
* we reach the end, or find a positive.
*/
public boolean pageExists(MD5Hash md5) throws IOException {
return pagesByMD5[DBKeyDivision.findMD5Section(md5, numMachines)].pageExists(md5);
}
/**
* Iterate through all the Pages, sorted by URL.
* We need to enumerate all the Enumerations given
* to us via a call to pages() for each DBSectionReader.
*/
public Enumeration pages() throws IOException {
return new MetaEnumerator(pagesByURL, PAGE_ENUMS);
}
/**
* Iterate through all the Pages, sorted by MD5.
* We enumerate all the DBSectionReader Enumerations,
* just as above.
*/
public Enumeration pagesByMD5() throws IOException {
return new MetaEnumerator(pagesByMD5, PAGE_MD5_ENUMS);
}
/**
* Get all the hyperlinks that link TO the indicated URL.
*/
public Link[] getLinks(UTF8 url) throws IOException {
Vector resultSet = linksByURL[DBKeyDivision.findURLSection(url.toString(), numMachines)].getLinks(url);
Link resultArray[] = new Link[resultSet.size()];
int i = 0;
for (Enumeration e = resultSet.elements(); e.hasMoreElements(); ) {
resultArray[i++] = (Link) e.nextElement();
}
return resultArray;
}
/**
* Grab all the links from the given MD5 hash.
*/
public Link[] getLinks(MD5Hash md5) throws IOException {
Vector resultSet = linksByMD5[DBKeyDivision.findMD5Section(md5, numMachines)].getLinks(md5);
Link resultArray[] = new Link[resultSet.size()];
int i = 0;
for (Enumeration e = resultSet.elements(); e.hasMoreElements(); ) {
resultArray[i++] = (Link) e.nextElement();
}
return resultArray;
}
/**
* Return all the links, by target URL
*/
public Enumeration links() throws IOException {
return new MetaEnumerator(linksByURL, LINK_ENUMS);
}
//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -