⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 webdbwriter.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
                     currentUrl.compareTo(current.getPage().getURL()) == 0);
            return true;
        }
    }


    /*************************************************
     * Holds an instruction over a Link.
     *************************************************/
    public static class LinkInstruction implements WritableComparable {
        Link link;
        int instruction;

        /**
         */
        public LinkInstruction() {
        }

        /**
         */
        public LinkInstruction(Link link, int instruction) {
            set(link, instruction);
        }

        /**
         * Re-init from another LinkInstruction's info.
         */
        public void set(LinkInstruction that) {
            this.instruction = that.instruction;
          
            if (this.link == null)
                this.link = new Link();

            this.link.set(that.link);
        }

        /**
         * Re-init with a Link and an instruction
         */
        public void set(Link link, int instruction) {
            this.link = link;
            this.instruction = instruction;
        }

        //
        // WritableComparable
        //
        public int compareTo(Object o) {
            return this.link.compareTo(((LinkInstruction) o).link);
        }
        public void write(DataOutput out) throws IOException {
            out.writeByte(instruction);
            link.write(out);
        }
        public void readFields(DataInput in) throws IOException {
            this.instruction = in.readByte();
            if (link == null)
                link = new Link();
            link.readFields(in);
        }
        public Link getLink() {
            return link;
        }
        public int getInstruction() {
            return instruction;
        }

        /*******************************************************
         * Sorts the instruction first by Md5, then by opcode.
         *******************************************************/
        public static class MD5Comparator extends WritableComparator {
            private static final Link.MD5Comparator MD5_COMPARATOR =
            new Link.MD5Comparator();

            public MD5Comparator() { super(LinkInstruction.class); }

            public int compare(WritableComparable a, WritableComparable b) {
                LinkInstruction instructionA = (LinkInstruction)a;
                LinkInstruction instructionB = (LinkInstruction)b;
                return instructionA.link.md5Compare(instructionB.link);
            }

            /** Optimized comparator. */
            public int compare(byte[] b1, int s1, int l1,
                               byte[] b2, int s2, int l2) {
                return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
            }
        }
 
        /*********************************************************
         * Sorts the instruction first by url, then by opcode.
         *********************************************************/
        public static class UrlComparator extends WritableComparator {
            private static final Link.UrlComparator URL_COMPARATOR =
            new Link.UrlComparator();

            public UrlComparator() { super(LinkInstruction.class); }

            public int compare(WritableComparable a, WritableComparable b) {
                LinkInstruction instructionA = (LinkInstruction)a;
                LinkInstruction instructionB = (LinkInstruction)b;
                return instructionA.link.urlCompare(instructionB.link);

            }

            /** 
             * Optimized comparator. 
             */
            public int compare(byte[] b1, int s1, int l1,
                               byte[] b2, int s2, int l2) {
                return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
            }
        }
    }

    /*******************************************************
     * LinkInstructionWriter very efficiently writes a
     * LinkInstruction to a SequenceFile.Writer.  Much better
     * than calling "writer.append(new LinkInstruction())"
     ********************************************************/
    public static class LinkInstructionWriter {
        LinkInstruction li = new LinkInstruction();

        /**
         */
        public LinkInstructionWriter() {
        }

        /**
         * Append the LinkInstruction info to the indicated SequenceFile
         * and keep the LI for later reuse.
         */
        public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Link link, int opcode, Writable val) throws IOException {
            li.set(link, opcode);
            writer.append(li, val);
        }
    }

    /********************************************************
     * This class deduplicates link operations.  We want to 
     * sort by MD5, then by URL.  But all operations
     * should be unique.
     *********************************************************/
    class DeduplicatingLinkSequenceReader {
        Link currentKey = new Link();
        LinkInstruction current = new LinkInstruction();
        SequenceFile.Reader edits;
        boolean haveCurrent;

        /**
         */
        public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException {
            this.edits = edits;
            this.haveCurrent = edits.next(current, NullWritable.get());
        }


        /**
         * The incoming stream of edits is sorted first by MD5, then by URL.
         * MD5-only values always come before MD5+URL.
         */
        public boolean next(LinkInstruction key) throws IOException {
            if (! haveCurrent) {
                return false;
            }

            currentKey.set(current.getLink());
            
            do {
                key.set(current);
            } while ((haveCurrent = edits.next(current, NullWritable.get())) &&
                     currentKey.compareTo(current.getLink()) == 0);
            return true;
        }
    }


    /**************************************************
     * The CloseProcessor class is used when we close down
     * the webdb.  We give it the path, members, and class values
     * needed to apply changes to any of our 4 data tables.
     * 
     * This is an abstract class.  Each subclass must define
     * the exact merge procedure.  However, file-handling
     * and edit-processing is standardized as much as possible.
     *
     **************************************************/
    private abstract class CloseProcessor {
        String basename;
        MapFile.Reader oldDb;
        SequenceFile.Writer editWriter;
        SequenceFile.Sorter sorter;
        WritableComparator comparator;
        Class keyClass, valueClass;
        long itemsWritten = 0;

        /**
         * Store away these members for later use.
         */
        CloseProcessor(String basename, MapFile.Reader oldDb, SequenceFile.Writer editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class keyClass, Class valueClass) {
            this.basename = basename;
            this.oldDb = oldDb;
            this.editWriter = editWriter;
            this.sorter = sorter;
            this.comparator = comparator;
            this.keyClass = keyClass;
            this.valueClass = valueClass;
        }

        /**
         * Perform the shutdown sequence for this Processor.
         * There is a lot of file-moving and edit-sorting that
         * is common across all the 4 tables.
         *
         * Returns how many items were written out by this close().
         */
        long closeDown(File workingDir, File outputDir, long numEdits) throws IOException {
            File editsFile = new File(workingDir, basename + ".out");
            File newDbFile = new File(outputDir, basename);
            File sortedEditsFile = new File(editsFile.getPath() + ".sorted");
            editWriter.close();

            // If there are edits, then process them.
            if (numEdits != 0) {
                // Sort the edits
                long startSort = System.currentTimeMillis();
                sorter.sort(editsFile.getPath(), sortedEditsFile.getPath());
                // sorter.close();
                long endSort = System.currentTimeMillis();
                LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds.");
                LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second");
            
                // Rename appropriately
                fs.delete(editsFile);
                fs.rename(sortedEditsFile, editsFile);

                // Read the sorted edits
                SequenceFile.Reader sortedEdits = new SequenceFile.Reader(fs, editsFile.getPath());

                // Create a brand-new output db for the integrated data
                MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(fs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(fs, newDbFile.getPath(), comparator, valueClass);

                // Iterate through the edits, and merge changes with existing
                // db into the brand-new file
                oldDb.reset();
            
                // Merge the edits.  We did it!
                long startMerge = System.currentTimeMillis();
                mergeEdits(oldDb, sortedEdits, newDb);
                long endMerge = System.currentTimeMillis();
                LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds");
                LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second");

                // Close down readers, writers
                sortedEdits.close();
                newDb.close();
            } else {
                // Otherwise, simply copy the file into place,
                // without all the processing overhead.
                long startCopy = System.currentTimeMillis();
                File curFile = new File(dbFile, basename);
                FileUtil.recursiveCopy(fs, curFile, newDbFile);
                long endCopy = System.currentTimeMillis();

                LOG.info("Processing " + basename + ": Copied file (" + newDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs.");
            }

            // Delete the now-consumed edits file to save space
            fs.delete(editsFile);

            return itemsWritten;
        }

        /**
         * The loop that actually applies the changes and writes to
         * a new db.  This is different for every subclass!
         */
        abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException;
    }

    /***
     * The PagesByURLProcessor is used during close() time for
     * the pagesByURL table.  We instantiate one of these, and it
     * takes care of the entire shutdown process.
     */
    private class PagesByURLProcessor extends CloseProcessor {
        SequenceFile.Writer futureEdits;

        /**
         * We store "futureEdits" so we can write edits for the
         * next table-db step
         */
        PagesByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, SequenceFile.Writer futureEdits) {
            super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class);
            this.futureEdits = futureEdits;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -