⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedanalysistool.java

📁 一些简要的公爵类一些简要的公爵类一些简要的公爵类
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.tools;

import java.io.*;
import java.util.*;
import java.util.logging.*;

import net.nutch.io.*;
import net.nutch.db.*;
import net.nutch.fs.*;
import net.nutch.net.*;
import net.nutch.util.*;
import net.nutch.linkdb.*;
import net.nutch.pagedb.*;
import net.nutch.fetcher.*;

/***************************************
 * DistributedAnalysisTool performs link-analysis by reading
 * exclusively from a IWebDBReader, and writing to
 * an IWebDBWriter.
 *
 * This tool can be used in phases via the command line
 * to compute the LinkAnalysis score across many machines.
 *
 * For a single iteration of LinkAnalysis, you must have:
 *
 * 1) An "initRound" step that writes down how the work should be
 *      divided.  This outputs a "dist" directory which must be made
 *      available to later steps.  It requires the input db directory.
 *
 * 2) As many simultaneous "computeRound" steps as you like, but this
 *      number must be determined in step 1.  Each step may be run
 *      on different machines, or on the same, or however you like.
 *      It requires the the "db" and "dist" directories (or copies) as
 *      inputs.  Each run will output an "instructions file".
 *
 * 3) A "completeRound" step, which integrates the results of all the
 *      many "computeRound" steps.  It writes to a "db" directory.  It
 *      assumes that all the instructions files have been gathered into
 *      a single "dist" input directory.  If you're running everything
 *      on a single filesystem, this will happen easily.  If not, then
 *      you will have to gather the files by hand (or with a script).
 *    
 * For more iterations, repeat steps 1 - 3!
 *
 * @author Mike Cafarella
 ***************************************/
public class DistributedAnalysisTool {   
    final private static String ASSIGN_FILE_PREFIX = "assignment";
    final private static String SCORE_EDITS_FILE_PREFIX = "scoreEdits";
    final private static String ASSIGN_COMPLETE = "assignComplete";
    
    final private static float DEFAULT_SCORE = 0.15f;
    final private static float DECAY_VALUE = 0.85f;

    public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.DistributedAnalysisTool");

    /**
     * The EditSet inner class represents all of the sorted edits
     * files we must process.  The edit-loop can repeatedly ask
     * an EditSet for the "next item", and the EditSet will 
     * seamlessly deal with opening and closing files
     */
    class EditSet {
        File distDir;
        int numEditFiles;
        int curEditFile;
        SequenceFile.Reader curReader;

        /**
         * The "distDir" is where we find all the edit files.
         * The "numEditFiles" is now many we can expect to get there.
         */
        public EditSet(File distDir, int numEditFiles) throws IOException {
            this.distDir = distDir;
            this.numEditFiles = numEditFiles;
            this.curEditFile = 0;
            getNextReader();
        }

        /**
         * Get the next item for reading, closing and opening
         * files if necessary.  Return false if there are no
         * more items to return.
         */
        public synchronized boolean next(Writable key, Writable val) throws IOException {
            //
            // Open the next input stream if necessary
            //
            if (curReader == null) {
                getNextReader();
                // Assume each edits-file has at least one entry in it.
                if (curReader == null) {
                    return false;
                }
            }
            return curReader.next(key, val);
        }

        /**
         * Create the next edit reader and return it.
         */
        private void getNextReader() throws IOException {
            if (curReader != null) {
                curReader.close();
            }

            if (curEditFile < numEditFiles) {
                curReader = new SequenceFile.Reader(nfs, new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + curEditFile + ".sorted").getPath());
                LOG.info("Opened stream to file " + curEditFile);
                curEditFile++;
            }
        }

        /**
         */
        public synchronized void close() throws IOException {
            if (curReader != null) {
                curReader.close();
            }
            curEditFile = numEditFiles;
        }
    }

    /**
     * This is a Writable version of a Float.  We
     * need this so we can store it in a SequenceFile
     */
    class ScoreValue implements Writable {
        float score;
        float nextScore;

        /**
         */
        public ScoreValue() {
        }
        /**
         */
        public void setScore(float f) {
            this.score = f;
        }
        /**
         */
        public void setNextScore(float f) {
            this.nextScore = f;
        }

        /**
         */
        public float score() {
            return score;
        }
        /**
         */
        public float nextScore() {
            return nextScore;
        }

        /**
         */
        public void write(DataOutput out) throws IOException {
            out.writeFloat(score);
            out.writeFloat(nextScore);
        }

        /**
         */
        public void readFields(DataInput in) throws IOException {
            this.score = in.readFloat();
            this.nextScore = in.readFloat();
        }
    }

    NutchFileSystem nfs;
    File dbDir;

    /**
     * Give the pagedb and linkdb files and their cache sizes
     */
    public DistributedAnalysisTool(NutchFileSystem nfs, File dbDir) throws IOException, FileNotFoundException {
        this.nfs = nfs;
        this.dbDir = dbDir;
    }

    /**
     * This method prepares the ground for a set of processes
     * to distribute a round of LinkAnalysis work.  It writes out
     * the "assignments" to a directory.  This directory must be
     * made accessible to all the processes.  (It may be mounted by
     * all of them, or copied to all of them.)
     *
     * This is run by a single process, and it is run first.
     */
    public boolean initRound(int numProcesses, File distDir) throws IOException {
        //
        // The distDir must be empty or non-existent.
        //
        if ((nfs.exists(distDir) && nfs.isFile(distDir)) ||
            (nfs.exists(distDir) && (nfs.listFiles(distDir).length != 0))) {
            LOG.severe("Must be an empty or non-existent dir: " + distDir);
            return false;
        }
        if (! nfs.exists(distDir)) {
            nfs.mkdirs(distDir);
        }

        //
        // Figure out how many db items we have, and how many
        // processes they are allocated to.
        //
        long startPages[] = new long[numProcesses];
        long totalPages = 0;
        IWebDBReader reader = new WebDBReader(nfs, dbDir);
        try {
            totalPages = reader.numPages();     
        } finally {
            reader.close();
        }
        long chunkSize = totalPages / numProcesses;
        long pagesProcessedSoFar = 0;

        //
        // From zero to the 2nd-to-last item, assign a
        // chunk's worth of pages.  The value at each index
        // indicates the start page for that process.
        //
        startPages[0] = 0;
        for (int i = 1; i < numProcesses; i++) {
            startPages[i] = startPages[i-1] + chunkSize;
        }

        //
        // Emit the assignments for the processes
        //
        try {
            // Write out each file
            for (int i = 0; i < numProcesses; i++) {
                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(new File(distDir, ASSIGN_FILE_PREFIX + "." + i))));
                try {
                    // Start page
                    out.writeLong(startPages[i]);

                    // How many pages to process
                    if (i != numProcesses - 1) {
                        out.writeLong(chunkSize);
                    } else {
                        // in last index, make up for remainders
                        out.writeLong(totalPages - ((numProcesses - 1) * chunkSize));
                    }
                } finally {
                    out.close();
                }
            }
            
            //
            // Write a file that indicates we finished correctly.
            // This makes it easier for controlling scripts to
            // check whether this process completed.
            //
            // It also holds some overall instruction information,
            // so we can do some error-checking at complete-time.
            //
            File completeFile = new File(distDir, "assignComplete");
            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(completeFile)));
            try {
                out.writeInt(numProcesses);
                out.writeLong(totalPages);
                
                // Compute extents
                long extent[] = new long[numProcesses];
                for (int i = 0; i < numProcesses - 1; i++) {
                    extent[i] = chunkSize * (i + 1);
                }
                extent[numProcesses-1] = totalPages - (chunkSize * (numProcesses - 1));
                
                // Emit extents
                for (int i = 0; i < extent.length; i++) {
                    out.writeLong(extent[i]);
                }
            } finally {
                out.close();
            }
            return true;
        } catch (IOException ex) {
            LOG.severe(ex.toString());
            LOG.severe("Sorry, could not finish assignments");
        }
        return false;
    }

    /**
     * This method is invoked by one of the many processes involved
     * in LinkAnalysis.  There will be many of these running at the
     * same time.  That's OK, though, since there's no locking
     * that has to go on between them.
     *
     * This computes the LinkAnalysis score for a given region
     * of the database.  It writes its ID, the region params, and
     * the scores-to-be-written into a flat file.  This file is
     * labelled according to its processid, and is found inside distDir.
     */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -