⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedwebdbwriter.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
📖 第 1 页 / 共 5 页
字号:

    /**
     * This class helps the LinksByURLProcessor test a list of
     * Page objects, sorted by URL, for outlink-counts.  We query
     * this class with a series of questions, based on Links sorted
     * by target URL.
     */
    private class TargetTester {
        MapFile.Reader pagedb;
        boolean hasPage = false;
        UTF8 pageURL = null;
        Page page = null;

        /**
         */
        public TargetTester(MapFile.Reader pagedb) throws IOException {
            this.pagedb = pagedb;
            this.pageURL = new UTF8();
            this.page = new Page();
            this.hasPage = pagedb.next(pageURL, page);
        }

        /**
         * Match the given URL against the sorted series of Page URLs.
         */
        public int hasOutlinks(UTF8 curURL) throws IOException {
            int returnCode = NO_OUTLINKS;
            int comparison = pageURL.compareTo(curURL);

            while (hasPage && comparison < 0) {
                hasPage = pagedb.next(pageURL, page);
                if (hasPage) {
                    comparison = pageURL.compareTo(curURL);
                }
            }

            if (hasPage) {
                if (comparison == 0) {
                    returnCode = (page.getNumOutlinks() > 0) ? HAS_OUTLINKS : NO_OUTLINKS;
                } else if (comparison > 0) {
                    //
                    // This situation indicates that the Link's 
                    // target page has been deleted, probably
                    // because we repeatedly failed to fetch the URL.
                    // So, we should delete the Link.
                    //
                    returnCode = LINK_INVALID;
                }
            }
            return returnCode;
        }

        /**
         */
        public void close() throws IOException {
            pagedb.close();
        }
    }

    /**
     * Closes down and merges changes to the URL-driven link
     * table.  This does nothing fancy, and propagates nothing
     * to a further stage.  There is no next stage!
     */
    private class LinksByURLProcessor extends CloseProcessor {
        MapFile.Reader pageDb;
        EditSectionGroupWriter futureEdits;

        /**
         */
        public LinksByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, MapFile.Reader pageDb, EditSectionGroupWriter futureEdits) {
            super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class, "LinksByURLPart");
            this.pageDb = pageDb;
            this.futureEdits = futureEdits;
        }

        /**
         */
        public long closeDown(File workingDir, File outputDir) throws IOException {
            long result = super.closeDown(workingDir, outputDir);
            pageDb.close();
            return result;
        }

        /**
         * Merge the existing db with the edit-stream into a brand-new file.
         */
        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
            WritableComparator comparator = new Link.UrlComparator();

            // Create the keys and vals we'll use
            LinkInstruction editItem = new LinkInstruction();
            Link readerItem = new Link();
        
            // Read the first items from both streams
            boolean hasEntries = db.next(readerItem, NullWritable.get());
            boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
            TargetTester targetTester = new TargetTester(pageDb);

            // As long as we have both edits and entries to process,
            // we need to interleave them
            while (hasEntries && hasEdits) {
                int curInstruction = editItem.getInstruction();

                if (curInstruction == ADD_LINK) {
                    //  When we add a link, we may replace a previous
                    //    link with identical URL and MD5 values.  Our
                    //    comparator will test both
                    //
                    int comparison = comparator.compare(readerItem, editItem.getLink());

                    if (comparison < 0) {
                        // Write the readerKey, just passing it along.
                        // Don't process the edit yet.
                        int linkTest = targetTester.hasOutlinks(readerItem.getURL());

                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
                            targetOutlinkEdits++;
                        } else {
                            boolean oldOutlinkStatus = readerItem.targetHasOutlink();
                            boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
                            // Do the conditional so we minimize unnecessary 
                            // mod-writes.
                            if (oldOutlinkStatus != newOutlinkStatus) {
                                readerItem.setTargetHasOutlink(newOutlinkStatus);
                                liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
                                targetOutlinkEdits++;
                            }
                            newDb.append(readerItem, NullWritable.get());
                            itemsWritten++;
                        }
                        hasEntries = db.next(readerItem, NullWritable.get());
                    } else if (comparison == 0) {
                        // Write the new item, "replacing" the old one.
                        // We move to the next edit instruction and move
                        //    past the replaced db entry.
                        Link editLink = editItem.getLink();
                        int linkTest = targetTester.hasOutlinks(editLink.getURL());

                        // Delete the edit/readerItem from the other table if it's
                        // found to be invalid.
                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
                        } else {
                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());

                            newDb.append(editLink, NullWritable.get());
                            itemsWritten++;
                        }
                        targetOutlinkEdits++;

                        hasEntries = db.next(readerItem, NullWritable.get());
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    } else if (comparison > 0) {
                        // Write the new item.  We stay at the current
                        // db entry.
                        Link editLink = editItem.getLink();
                        int linkTest = targetTester.hasOutlinks(editLink.getURL());

                        // Delete the edit from the other table if it's invalid
                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
                        } else {
                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
                            newDb.append(editLink, NullWritable.get());
                            itemsWritten++;
                        }
                        targetOutlinkEdits++;

                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    }
                } else if (curInstruction == DEL_LINK) {
                    // When we delete a link, we do it by MD5 and apply
                    //   it to the index first.  A single delete instruction
                    //   may remove many items in the db, during the earlier
                    //   processing.  However, unlike the index-processing stage,
                    //   here we can expect a new DEL instruction for every 
                    //   item that we remove from the db.
                    //
                    int comparison = comparator.compare(readerItem, editItem.getLink());

                    if (comparison < 0) {
                        // Write readerKey, just passing it along.  Don't
                        //   process the edit yet.
                        int linkTest = targetTester.hasOutlinks(readerItem.getURL());

                        // Delete the reader item if it's found to be invalid
                        if (linkTest == LINK_INVALID) {
                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
                        } else {
                            readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                            liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
                            newDb.append(readerItem, NullWritable.get());
                            itemsWritten++;
                        }
                        targetOutlinkEdits++;

                        hasEntries = db.next(readerItem, NullWritable.get());
                    } else if (comparison == 0) {
                        // "Delete" the item by passing by the readerKey.
                        // We want a new entry, as well as the next instruction
                        // to process.
                        hasEntries = db.next(readerItem, NullWritable.get());
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    } else if (comparison > 0) {
                        // Ignore, move on to next instruction
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    }
                }
            }

            // Now we have only edits.  No more preexisting items!
            while (! hasEntries && hasEdits) {
                int curInstruction = editItem.getInstruction();

                if (curInstruction == ADD_LINK) {
                    //
                    // Add the item from the edit list.
                    //

                    //
                    // Make sure the outlinks flag is set properly.
                    //
                    Link editLink = editItem.getLink();
                    int linkTest = targetTester.hasOutlinks(editLink.getURL());
                    if (linkTest == LINK_INVALID) {
                        liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
                    } else {
                        editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
                        liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
                        newDb.append(editLink, NullWritable.get());
                        itemsWritten++;
                    }
                    targetOutlinkEdits++;
                } else if (curInstruction == DEL_LINK) {
                    // Ignore operation
                }
                // Move on to next edit
                hasEdits = sortedEdits.next(editItem, NullWritable.get());
            }

            // Now we have only preexisting items.  Just copy them
            // to the new file, in order.
            while (hasEntries && ! hasEdits) {
                //
                // Simply copy the remaining database items.
                //

                //
                // First, make sure the 'outlinks' flag is set properly.
                //
                int linkTest = targetTester.hasOutlinks(readerItem.getURL());
                if (linkTest == LINK_INVALID) {
                    liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
                    targetOutlinkEdits++;
                } else {
                    boolean oldOutlinkStatus = readerItem.targetHasOutlink();
                    boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
                    if (oldOutlinkStatus != newOutlinkStatus) {
                        readerItem.setTargetHasOutlink(newOutlinkStatus);
                        liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
                        targetOutlinkEdits++;
                    }

                    // Now copy the object
                    newDb.append(readerItem, NullWritable.get());
                    itemsWritten++;
                }

                // Move on to next
                hasEntries = db.next(readerItem, NullWritable.get());
            }

            targetTester.close();
        }
    }

    /**
     * Method useful for the first time we create a distributed db project.
     * Basically need to write down the number of dirs we can expect.
     */
    public static void createDB(NutchFileSystem nfs, File root, int totalMachines) throws IOException {
        //
        // Check to see if the db already exists
        //
        File stdDir = new File(root, "standard");
        File machineInfo = new Fi

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -