⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 webdbwriter.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        }

        /**
         * Merge the existing db with the edit-stream into a brand-new file.
         */
        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
            // Create the keys and vals we'll be using
            DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits);
            WritableComparable readerKey = new UTF8();
            Page readerVal = new Page();
            PageInstruction editItem = new PageInstruction();
            int futureOrdering = 0;

            // Read the first items from both streams
            boolean hasEntries = db.next(readerKey, readerVal);
            boolean hasEdits = edits.next(editItem);

            // As long as we have both edits and entries, we need to
            // interleave them....
            while (hasEntries && hasEdits) {
                int comparison = readerKey.compareTo(editItem.getPage().getURL());
                int curInstruction = editItem.getInstruction();

                // Perform operations
                if ((curInstruction == ADD_PAGE) ||
                    (curInstruction == ADD_PAGE_WITH_SCORE) ||
                    (curInstruction == ADD_PAGE_IFN_PRESENT)) {

                    if (comparison < 0) {
                        // Write readerKey, just passing it along.
                        // Don't process the edit yet.
                        newDb.append(readerKey, readerVal);
                        itemsWritten++;
                        hasEntries = db.next(readerKey, readerVal);
                    } else if (comparison == 0) {
                        // The keys are equal.  If the instruction 
                        // is ADD_PAGE, we write the edit's key and 
                        // replace the old one.
                        //
                        // Otherwise, if it's ADD_IFN_PRESENT, 
                        // keep the reader's item intact.
                        //
                        if ((curInstruction == ADD_PAGE) ||
                            (curInstruction == ADD_PAGE_WITH_SCORE)) {
                            // An ADD_PAGE with an identical pair
                            // of pages replaces the existing one.
                            // We may need to note the fact for
                            // Garbage Collection.
                            //
                            // This happens in three stages.  
                            // 1.  We write necessary items to the future
                            //     edits-list.
                            //
                            pagesByMD5Edits++;

                            // If this is a replacing add, we don't want
                            // to disturb the score from the old Page!  This,
                            // way, we can run some link analysis scoring
                            // while the new Pages are being fetched and
                            // not lose the info when a Page is replaced.
                            //
                            // If it is an ADD_PAGE_WITH_SCORE, then we 
                            // go ahead and replace the old one.
                            //
                            // Either way, from now on we treat it
                            // as an ADD_PAGE
                            //
                            Page editItemPage = editItem.getPage();

                            if (curInstruction == ADD_PAGE) {
                                editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore());
                            }

                            piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get());

                            //
                            // 2.  We write the edit-page to *this* table.
                            //
                            newDb.append(editItemPage.getURL(), editItemPage);

                            //
                            // 3.  We want the ADD in the next step (the
                            //     MD5-driven table) to be a "replacing add".
                            //     But that won't happen if the readerItem and
                            //     the editItem Pages are not identical.
                            //     (In this scenario, that means their URLs
                            //     are the same, but their MD5s are different.)
                            //     So, we need to explicitly handle that
                            //     case by issuing a DELETE for the now-obsolete
                            //     item.
                            if (editItemPage.compareTo(readerVal) != 0) {
                                pagesByMD5Edits++;
                                piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
                            }

                            itemsWritten++;

                            // "Delete" the readerVal by skipping it.
                            hasEntries = db.next(readerKey, readerVal);
                        } else {
                            // ADD_PAGE_IFN_PRESENT.  We only add IF_NOT
                            // present.  And it was present!  So, we treat 
                            // this case like we treat a no-op.
                            // Just move to the next edit.
                        }
                        // In either case, we process the edit.
                        hasEdits = edits.next(editItem);

                    } else if (comparison > 0) {
                        // We have inserted a Page that's before some
                        // entry in the existing database.  So, we just
                        // need to write down the Page from the Edit file.
                        // It's like the above case, except we don't tell
                        // the future-edits to delete anything.
                        //
                        // 1.  Write the item down for the future.
                        pagesByMD5Edits++;

                        //
                        // If this is an ADD_PAGE_IFN_PRESENT, then
                        // we may also have a Link we have to take care of!
                        //
                        if (curInstruction == ADD_PAGE_IFN_PRESENT) {
                            Link editLink = editItem.getLink();
                            if (editLink != null) {
                                addLink(editLink);
                            }
                        }
                        piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());

                        //
                        // 2.  Write the edit-page to *this* table
                        newDb.append(editItem.getPage().getURL(), editItem.getPage());
                        itemsWritten++;

                        // Process the edit
                        hasEdits = edits.next(editItem);
                    }
                } else if (curInstruction == DEL_PAGE) {
                    if (comparison < 0) {
                        // Write the readerKey, just passing it along.
                        // We don't process the edit yet.
                        newDb.append(readerKey, readerVal);
                        itemsWritten++;
                        hasEntries = db.next(readerKey, readerVal);
                    } else if (comparison == 0) {
                        // Delete it!  We can only delete one item
                        // at a time, as all URLs are unique.
                        // 1.  Tell the future-edits what page will need to
                        //     be deleted.
                        pagesByMD5Edits++;
                        piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());

                        //
                        // 2.  "Delete" the entry by skipping the Reader
                        //     key.
                        hasEntries = db.next(readerKey, readerVal);

                        // Process the edit
                        hasEdits = edits.next(editItem);
                    } else if (comparison > 0) {
                        // Ignore it.  We tried to delete an item that's
                        // not here.
                        hasEdits = edits.next(editItem);
                    }
                }
            }

            // Now we have only edits.  No more preexisting items!
            while (! hasEntries && hasEdits) {
                int curInstruction = editItem.getInstruction();
                if (curInstruction == ADD_PAGE ||
                    curInstruction == ADD_PAGE_WITH_SCORE ||
                    curInstruction == ADD_PAGE_IFN_PRESENT) {
                    // No more reader entries, so ADD_PAGE_IFN_PRESENT
                    // is treated like a simple ADD_PAGE.

                    // 1.  Tell the future edits-list about this new item
                    pagesByMD5Edits++;
                    
                    //
                    // If this is an ADD_PAGE_IFN_PRESENT, then
                    // we may also have a Link we have to take care of!
                    //
                    if (curInstruction == ADD_PAGE_IFN_PRESENT) {
                        Link editLink = editItem.getLink();
                        if (editLink != null) {
                            addLink(editLink);
                        }
                    }
                    piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());

                    // 2.  Write the edit page to this table.
                    newDb.append(editItem.getPage().getURL(), editItem.getPage());
                    itemsWritten++;
                } else if (curInstruction == DEL_PAGE) {
                    // Ignore it.  We tried to delete an item
                    // that's not here.
                }

                // Either way, we always process the edit.
                hasEdits = edits.next(editItem);
            }

            // Now we have only preexisting items.  We just copy
            // them to the new file, in order.
            while (hasEntries && ! hasEdits) {
                newDb.append(readerKey, readerVal);
                itemsWritten++;
                hasEntries = db.next(readerKey, readerVal);
            }
        }
    }

    /***
     * The PagesByMD5Processor is used during close() time for
     * the pagesByMD5 table.  We instantiate one of these, and it
     * takes care of the entire shutdown process.
     */
    private class PagesByMD5Processor extends CloseProcessor {
        /**
         */
        PagesByMD5Processor(MapFile.Reader db, SequenceFile.Writer editWriter) {
            super(PAGES_BY_MD5, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.PageComparator(), NullWritable.class), null, Page.class, NullWritable.class);
        }

        /**
         */
        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
            // Create the keys and vals
            Page readerItem = new Page();
            PageInstruction editItem = new PageInstruction();

            // For computing the GC list
            Page deletedItem = new Page(), lastItem = new Page();
            boolean justDeletedItem = false;
            boolean newReaderItem = false;
            int itemRepeats = 0;

            // Read the first items from both streams
            boolean hasEntries = db.next(readerItem, NullWritable.get());
            boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
            if (hasEntries) {
                // The first thing we read should become
                // the "previous key".  We need this for
                // garbage collection.
                outBuf.reset();
                readerItem.write(outBuf);
                inBuf.reset(outBuf.getData(), outBuf.getLength());
                lastItem.readFields(inBuf);
                itemRepeats = 0;
            }

            // As long we have both edits and entries, we need to
            // interleave them.
            while (hasEdits && hasEntries) {
                int comparison = readerItem.compareTo(editItem.getPage());
                int curInstruction = editItem.getInstruction();

                //
                // OK!  Now perform operations
                //
                if (curInstruction == ADD_PAGE) {
                    if (comparison < 0) {
                        // Write readerItem, just passing it along.
                        // Don't process the edit yet.
                        newDb.append(readerItem, NullWritable.get());
                        itemsWritten++;
                        hasEntries = db.next(readerItem, NullWritable.get());
                        newReaderItem = true;
                    } else if (comparison == 0) {
                        //
                        // This is a "replacing ADD", which is generated
                        // by the above-sequence.  We should skip over the
                        // existing item, and add the new one instead.
                        //
                        // Note that by this point, the new version of the
                        // Page from the edit sequence is guaranteed to
                        // have the correct score.  We make sure of it in
                        // the mergeEdits() for PagesByURLProcessor.
                        //
                        newDb.append(editItem.getPage(), NullWritable.get());
                        itemsWritten++;
                        hasEntries = db.next(readerItem, NullWritable.get());
                        newReaderItem = true;
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    } else if (comparison > 0) {
                        // Write the edit item.  We've inserted an item
                        // that comes before any others.
                        newDb.append(editItem.getPage(), NullWritable.get());
                        itemsWritten++;
                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
                    }
                } else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
                    throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index:  " + editItem);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -