⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fpmergeuriuniqfilter.java

📁 Heritrix是一个开源,可扩展的web爬虫项目。Heritrix设计成严格按照robots.txt文件的排除指示和META robots标签。
💻 JAVA
字号:
/* UriUniqFilterImpl** $Id: FPMergeUriUniqFilter.java 4647 2006-09-22 18:39:39Z paul_jack $** Created on Sep 29, 2005** Copyright (C) 2005 Internet Archive.** This file is part of the Heritrix web crawler (crawler.archive.org).** Heritrix is free software; you can redistribute it and/or modify* it under the terms of the GNU Lesser Public License as published by* the Free Software Foundation; either version 2.1 of the License, or* any later version.** Heritrix is distributed in the hope that it will be useful,* but WITHOUT ANY WARRANTY; without even the implied warranty of* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the* GNU Lesser Public License for more details.** You should have received a copy of the GNU Lesser Public License* along with Heritrix; if not, write to the Free Software* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA*/ package org.archive.crawler.util;import it.unimi.dsi.fastutil.longs.LongIterator;import java.io.BufferedOutputStream;import java.io.File;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.PrintWriter;import java.util.Iterator;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import org.archive.crawler.datamodel.CandidateURI;import org.archive.crawler.datamodel.UriUniqFilter;import org.archive.util.fingerprint.ArrayLongFPCache;import st.ata.util.FPGenerator;/** * UriUniqFilter based on merging FP arrays (in memory or from disk).  *  * Inspired by the approach in Najork and Heydon, "High-Performance * Web Crawling" (2001), section 3.2, "Efficient Duplicate URL  * Eliminators".  *  * @author gojomo */public abstract class FPMergeUriUniqFilter implements UriUniqFilter {    /**     * Represents a long fingerprint and (possibly) its corresponding     * CandidateURI, awaiting the next merge in a 'pending' state.      */    public class PendingItem implements Comparable {        long fp;        CandidateURI caUri;        public PendingItem(long fp, CandidateURI value) {            this.fp = fp;            this.caUri = value;        }        public int compareTo(Object arg0) {            PendingItem vs = (PendingItem) arg0;            return (fp < vs.fp) ? -1 : ( (fp == vs.fp) ? 0 : 1);         }    }        private static Logger LOGGER =        Logger.getLogger(FPMergeUriUniqFilter.class.getName());    protected HasUriReceiver receiver;    protected PrintWriter profileLog;        // statistics    protected long quickDuplicateCount = 0;    protected long quickDupAtLast = 0;     protected long pendDuplicateCount = 0;    protected long pendDupAtLast = 0;     protected long mergeDuplicateCount = 0;    protected long mergeDupAtLast = 0;         /** items awaiting merge     * TODO: consider only sorting just pre-merge     * TODO: consider using a fastutil long->Object class     * TODO: consider actually writing items to disk file,     * as in Najork/Heydon     */    protected TreeSet<PendingItem> pendingSet = new TreeSet<PendingItem>();        /** size at which to force flush of pending items */    protected int maxPending = DEFAULT_MAX_PENDING;    public static final int DEFAULT_MAX_PENDING = 10000;     // TODO: increase        /**     * time-based throttle on flush-merge operations     */    protected long nextFlushAllowableAfter = 0;    public static final long FLUSH_DELAY_FACTOR = 100;    /** cache of most recently seen FPs */    protected ArrayLongFPCache quickCache = new ArrayLongFPCache();    // TODO: make cache most-often seen, not just most-recent        public FPMergeUriUniqFilter() {        super();        String profileLogFile =             System.getProperty(FPMergeUriUniqFilter.class.getName()                + ".profileLogFile");        if (profileLogFile != null) {            setProfileLog(new File(profileLogFile));        }    }    public void setMaxPending(int max) {        maxPending = max;    }        public long pending() {        return pendingSet.size();    }    public void setDestination(HasUriReceiver receiver) {        this.receiver = receiver;    }    protected void profileLog(String key) {        if (profileLog != null) {            profileLog.println(key);        }    }        /* (non-Javadoc)     * @see org.archive.crawler.datamodel.UriUniqFilter#add(java.lang.String, org.archive.crawler.datamodel.CandidateURI)     */    public synchronized void add(String key, CandidateURI value) {        profileLog(key);        long fp = createFp(key);         if(! quickCheck(fp)) {            quickDuplicateCount++;            return;         }        pend(fp,value);        if (pendingSet.size()>=maxPending) {            flush();        }    }    /**     * Place the given FP/CandidateURI pair into the pending set, awaiting     * a merge to determine if it's actually accepted.      *      * @param fp long fingerprint     * @param value CandidateURI or null, if fp only needs merging (as when      * CandidateURI was already forced in     */    protected void pend(long fp, CandidateURI value) {        // special case for first batch of adds        if(count()==0) {            if(pendingSet.add(new PendingItem(fp,null))==false) {                pendDuplicateCount++; // was already present            } else {                // since there's no prior list to merge, push uri along right now                if(value!=null) {                    this.receiver.receive(value);                }            }            return;        }        if(pendingSet.add(new PendingItem(fp,value))==false) {            pendDuplicateCount++; // was already present        }    }    /**     * Evaluate if quick-check cache considers fingerprint novel enough     * for further consideration.      *      * @param fp long fingerprint to check     * @return true if fp deserves consideration; false if it appears in cache     */    private boolean quickCheck(long fp) {        return quickCache.add(fp);    }    /**     * Create a fingerprint from the given key     *      * @param key CharSequence (URI) to fingerprint     * @return long fingerprint     */    public static long createFp(CharSequence key) {        return FPGenerator.std64.fp(key);    }    /* (non-Javadoc)     * @see org.archive.crawler.datamodel.UriUniqFilter#addNow(java.lang.String, org.archive.crawler.datamodel.CandidateURI)     */    public void addNow(String key, CandidateURI value) {        add(key, value);        flush();    }        /* (non-Javadoc)     * @see org.archive.crawler.datamodel.UriUniqFilter#addForce(java.lang.String, org.archive.crawler.datamodel.CandidateURI)     */    public void addForce(String key, CandidateURI value) {        add(key,null); // dummy pend        this.receiver.receive(value);    }    /* (non-Javadoc)     * @see org.archive.crawler.datamodel.UriUniqFilter#note(java.lang.String)     */    public void note(String key) {        add(key,null);    }    /* (non-Javadoc)     * @see org.archive.crawler.datamodel.UriUniqFilter#forget(java.lang.String, org.archive.crawler.datamodel.CandidateURI)     */    public void forget(String key, CandidateURI value) {        throw new UnsupportedOperationException();    }    /* (non-Javadoc)     * @see org.archive.crawler.datamodel.UriUniqFilter#requestFlush()     */    public synchronized long requestFlush() {        if(System.currentTimeMillis()>nextFlushAllowableAfter) {            return flush();        } else {//            LOGGER.info("declining to flush: too soon after last flush");            return -1;         }    }    /**     * Perform a merge of all 'pending' items to the overall fingerprint list.      * If the pending item is new, and has an associated CandidateURI, pass that     * URI along to the 'receiver' (frontier) for queueing.      *      * @return number of pending items actually added      */    public synchronized long flush() {        if(pending()==0) {            return 0;        }        long flushStartTime = System.currentTimeMillis();        long adds = 0;         long fpOnlyAdds = 0;        Long currFp = null;         PendingItem currPend = null;                 Iterator pendIter = pendingSet.iterator();        LongIterator fpIter = beginFpMerge();        currPend = (PendingItem) (pendIter.hasNext() ? pendIter.next() : null);        currFp = (Long) (fpIter.hasNext() ? fpIter.next() : null);         while(true) {            while(currFp!=null && (currPend==null||(currFp.longValue() <= currPend.fp))) {                addNewFp(currFp.longValue());                if(currPend!=null && currFp.longValue() == currPend.fp) {                    mergeDuplicateCount++;                }                if(fpIter.hasNext()) {                    currFp = (Long) fpIter.next();                } else {                    currFp = null;                    break;                }            }            while(currPend!=null && (currFp==null||(currFp.longValue() > currPend.fp))) {                addNewFp(currPend.fp);                if(currPend.caUri!=null) {                    adds++;                    this.receiver.receive(currPend.caUri);                } else {                    fpOnlyAdds++;                }                if(pendIter.hasNext()) {                    currPend = (PendingItem)pendIter.next();                } else {                    currPend = null;                    break;                }            }            if(currFp==null) {                // currPend must be null too, or while wouldn't have exitted                // done                break;            }         }        // maintain throttle timing        long flushDuration = System.currentTimeMillis() - flushStartTime;        nextFlushAllowableAfter = flushStartTime + (FLUSH_DELAY_FACTOR*flushDuration);                // add/duplicate statistics        if(LOGGER.isLoggable(Level.INFO)) {            long mergeDups = (mergeDuplicateCount-mergeDupAtLast);            long pendDups = (pendDuplicateCount-pendDupAtLast);            long quickDups = (quickDuplicateCount-quickDupAtLast);            LOGGER.info("flush took "+flushDuration+"ms: "                    +adds+" adds, "                    +fpOnlyAdds+" fpOnlydds, "                    +mergeDups+" mergeDups, "                    +pendDups+" pendDups, "                    +quickDups+" quickDups ");            if(adds==0 && fpOnlyAdds==0 && mergeDups == 0 && pendDups == 0 && quickDups == 0) {                LOGGER.info("that's odd");            }        }        mergeDupAtLast = mergeDuplicateCount;        pendDupAtLast = pendDuplicateCount;        quickDupAtLast = quickDuplicateCount;        pendingSet.clear();        finishFpMerge();        return adds;    }        /**     * Begin merging pending candidates with complete list. Return an     * Iterator which will return all previously-known FPs in turn.      *      * @return Iterator over all previously-known FPs     */    abstract protected LongIterator beginFpMerge();        /**     * Add an FP (which may be an old or new FP) to the new complete     * list. Should only be called after beginFpMerge() and before     * finishFpMerge().      *      * @param fp  the FP to add     */    abstract protected void addNewFp(long fp);    /**     * Complete the merge of candidate and previously-known FPs (closing     * files/iterators as appropriate).      */    abstract protected void finishFpMerge();    public void close() {        if (profileLog != null) {            profileLog.close();        }    }    public void setProfileLog(File logfile) {        try {            profileLog = new PrintWriter(new BufferedOutputStream(                    new FileOutputStream(logfile)));        } catch (FileNotFoundException e) {            throw new RuntimeException(e);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -