⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 webdbwriter.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
                    //
                    // This situation indicates that the Link's 
                    // target page has been deleted, probably
                    // because we repeatedly failed to fetch the URL.
                    // So, we should delete the Link.
                    //
                    returnCode = LINK_INVALID;
                }
            }
            return returnCode;
        }

        /**
         */
        public void close() throws IOException {
            pagedb.close();
        }
    }

    /**
     * Closes down and merges changes to the URL-driven link
     * table.  This does nothing fancy, and propagates nothing
     * to a further stage.  There is no next stage!
     */
    private class LinksByURLProcessor extends CloseProcessor {
        MapFile.Reader pageDb;
        SequenceFile.Writer futureEdits;

        /**
         */
        public LinksByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, MapFile.Reader pageDb, SequenceFile.Writer futureEdits) {
            super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class);
            this.pageDb = pageDb;
            this.futureEdits = futureEdits;
        }

        /**
         */
        public long closeDown(File workingDir, File outputDir, long numEdits) throws IOException {
            long result = super.closeDown(workingDir, outputDir, numEdits);
            //pageDb.close();
            return result;
        }

        /**
         * Merge the existing db with the edit-stream into a brand-new file.
         */
        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
            WritableComparator comparator = new Link.UrlComparator();

            // Create the keys and vals we'll use
            LinkInstruction editItem = new LinkInstruction();
            Link readerItem = new Link();
        
            // Read the first items from both streams
            boolean hasEntries = db.next(readerItem, NullWritable.get());
            boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
            TargetTester targetTester = new TargetTester(pageDb);

            // As long as we have both edits and entries to process,
            // we need to interleave them
            while (hasEntries && hasEdits) {
                int curInstruction = editItem.getInstruction();

                if (curInstruction == ADD_LINK) {
                    //  When we add a link, we may replace a previous
                    //    link with identical URL and MD5 values.  Our
                    //    comparator will test both
                    //
                    int comparison = comparator.compare(readerItem, editItem.getLink());

                    if (comparison < 0) {
                        // Write the readerKey, just passing it along.
                        // Don't process the edit yet.
                        int linkTest = targetTester.hasOutlinks(readerItem.getURL());

                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
                            targetOutlinkEdits++;
                        } else {
                            boolean oldOutlinkStatus = readerItem.targetHasOutlink();
                            boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
                            // Do the conditional so we minimize unnecessary 
                            // mod-writes.
                            if (oldOutlinkStatus != newOutlinkStatus) {
                                readerItem.setTargetHasOutlink(newOutlinkStatus);
                                liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
                                targetOutlinkEdits++;
                            }
                            newDb.append(readerItem, NullWritable.get());
                            itemsWritten++;
                        }
                        hasEntries = db.next(readerItem, NullWritable.get());
                    } else if (comparison == 0) {
                        // Write the new item, "replacing" the old one.
                        // We move to the next edit instruction and move
                        //    past the replaced db entry.
                        Link editLink = editItem.getLink();
                        int linkTest = targetTester.hasOutlinks(editLink.getURL());

                        // Delete the edit/readerItem from the other table if it's
                        // found to be invalid.
                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
                        } else {
                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());

                            newDb.append(editLink, NullWritable.get());
                            itemsWritten++;
                        }
                        targetOutlinkEdits++;

                        hasEntries = db.next(readerItem, NullWritable.get());
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    } else if (comparison > 0) {
                        // Write the new item.  We stay at the current
                        // db entry.
                        Link editLink = editItem.getLink();
                        int linkTest = targetTester.hasOutlinks(editLink.getURL());

                        // Delete the edit from the other table if it's invalid
                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
                        } else {
                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
                            newDb.append(editLink, NullWritable.get());
                            itemsWritten++;
                        }
                        targetOutlinkEdits++;

                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    }
                } else if (curInstruction == DEL_LINK) {
                    // When we delete a link, we do it by MD5 and apply
                    //   it to the index first.  A single delete instruction
                    //   may remove many items in the db, during the earlier
                    //   processing.  However, unlike the index-processing stage,
                    //   here we can expect a new DEL instruction for every 
                    //   item that we remove from the db.
                    //
                    int comparison = comparator.compare(readerItem, editItem.getLink());

                    if (comparison < 0) {
                        // Write readerKey, just passing it along.  Don't
                        //   process the edit yet.
                        int linkTest = targetTester.hasOutlinks(readerItem.getURL());

                        // Delete the reader item if it's found to be invalid
                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
                        } else {
                            readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                            liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
                            newDb.append(readerItem, NullWritable.get());
                            itemsWritten++;
                        }
                        targetOutlinkEdits++;

                        hasEntries = db.next(readerItem, NullWritable.get());
                    } else if (comparison == 0) {
                        // "Delete" the item by passing by the readerKey.
                        // We want a new entry, as well as the next instruction
                        // to process.
                        hasEntries = db.next(readerItem, NullWritable.get());
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    } else if (comparison > 0) {
                        // Ignore, move on to next instruction
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    }
                }
            }

            // Now we have only edits.  No more preexisting items!
            while (! hasEntries && hasEdits) {
                int curInstruction = editItem.getInstruction();

                if (curInstruction == ADD_LINK) {
                    //
                    // Add the item from the edit list.
                    //

                    //
                    // Make sure the outlinks flag is set properly.
                    //
                    Link editLink = editItem.getLink();
                    int linkTest = targetTester.hasOutlinks(editLink.getURL());
                    if (linkTest == LINK_INVALID) {
                        liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
                    } else {
                        editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                        liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
                        newDb.append(editLink, NullWritable.get());
                        itemsWritten++;
                    }
                    targetOutlinkEdits++;
                } else if (curInstruction == DEL_LINK) {
                    // Ignore operation
                }
                // Move on to next edit
                hasEdits = sortedEdits.next(editItem, NullWritable.get());
            }

            // Now we have only preexisting items.  Just copy them
            // to the new file, in order.
            while (hasEntries && ! hasEdits) {
                //
                // Simply copy the remaining database items.
                //

                //
                // First, make sure the 'outlinks' flag is set properly.
                //
                int linkTest = targetTester.hasOutlinks(readerItem.getURL());
                if (linkTest == LINK_INVALID) {
                    liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
                    targetOutlinkEdits++;
                } else {
                    boolean oldOutlinkStatus = readerItem.targetHasOutlink();
                    boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
                    if (oldOutlinkStatus != newOutlinkStatus) {
                        readerItem.setTargetHasOutlink(newOutlinkStatus);
                        liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
                        targetOutlinkEdits++;
                    }

                    // Now copy the object
                    newDb.append(readerItem, NullWritable.get());
                    itemsWritten++;
                }

                // Move on to next
                hasEntries = db.next(readerItem, NullWritable.get());
            }

            targetTester.close();
        }
    }

    /**
     * Create the WebDB for the first time.
     */
    public static void createWebDB(NutchFileSystem nfs, File dbDir) throws IOException {
        WebDBWriter starter = new WebDBWriter(nfs, dbDir, true);
        starter.close();
    }
    
    boolean haveEdits = false;
    NutchFileSystem fs = null;
    File dbDir, dbFile, oldDbFile, newDbFile, tmp;
    MapFile.Reader pagesByURL, pagesByMD5, linksByURL, linksByMD5;
    SequenceFile.Writer pagesByURLWriter, pagesByMD5Writer, linksByURLWriter, linksByMD5Writer;
    long pagesByURLEdits = 0, pagesByMD5Edits = 0, linksByURLEdits = 0, linksByMD5Edits = 0, targetOutlinkEdits = 0;
    PageInstructionWriter piwriter = new PageInstructionWriter();
    LinkInstructionWriter liwriter = new LinkInstructionWriter();
    DataInputBuffer inBuf = new DataInputBuffer();
    DataOutputBuffer outBuf = new DataOutputBuffer();

    /**
     * Create a WebDBWriter.
     */
    public WebDBWriter(NutchFileSystem fs, File dbDir) throws IOException {
        this(fs, dbDir, false);
    }

    /**
     * Private constructor, so we can either open or create the db files.
     */
    private WebDBWriter(NutchFileSystem fs, File dbDir, boolean create) throws IOException {
        this.fs = fs;
        this.dbDir = dbDir;
        this.dbFile = new File(dbDir, "webdb");
        this.oldDbFile = new File(dbDir, "webdb.old");
        this.newDbFile = new File(dbDir, "webdb.new");
        this.tmp = new File(newDbFile, "tmp");

        if ((! fs.exists(dbDir)) && create) {
            fs.mkdirs(dbDir);
        }
        if (! fs.exists(dbDir) || ! fs.isDirectory(dbDir)) {
            throw new IOException("Database " + dbDir + " is not a directory.");
        }

        // Lock the writeLock immediately.
        fs.lock(new File(dbDir, "dbwritelock"), false);

        // Resolve any partial-state dirs from the last run.
        if (fs.exists(oldDbFile)) {
            if (fs.exists(dbFile)) {
             

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -