⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedanalysistool.java

📁 一些简要的公爵类一些简要的公爵类一些简要的公爵类
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    public void computeRound(int processId, File distDir) throws IOException {
        File assignFile = new File(distDir, ASSIGN_FILE_PREFIX + "." + processId);

        long startIndex = 0, extent = 0;
        DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(assignFile)));
        try {
            startIndex = in.readLong();
            extent = in.readLong();
        } finally {
            in.close();
        }

        LOG.info("Start at: "+  startIndex);
        LOG.info("Extent: "+  extent);

        // 
        // Open scoreEdits file for this process.  Write down 
        // all the score-edits we want to perform.
        //
        File scoreEdits = new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + processId);
        SequenceFile.Writer scoreWriter = new SequenceFile.Writer(nfs, scoreEdits.getPath() + ".unsorted", UTF8.class, ScoreValue.class);

        //
        // Go through the appropriate WebDB range, and figure out 
        // next scores
        //
        try {
            // Iterate through all items in the webdb, sorted by URL
            long curIndex = 0;
            ScoreValue score = new ScoreValue();
            IWebDBReader reader = new WebDBReader(nfs, dbDir);
            try {
                for (Enumeration e = reader.pagesByMD5(); e.hasMoreElements(); curIndex++) {
                    //
                    // Find our starting place
                    //
                    if (curIndex < startIndex) {
                        e.nextElement();
                        continue;
                    }

                    //
                    // Bail if we've been here too long
                    //
                    if (curIndex - startIndex > extent) {
                        break;
                    }

                    //
                    // OK, do some analysis!
                    //
                    Page curPage = (Page) e.nextElement();
                    Link outLinks[] = reader.getLinks(curPage.getMD5());
                    int targetOutlinkers = 0;
                    for (int i = 0; i < outLinks.length; i++) {
                        if (outLinks[i].targetHasOutlink()) {
                            targetOutlinkers++;
                        }
                    }

                    //
                    // For our purposes here, assume every Page
                    // has an inlink, even though that might not
                    // really be true.  It's close enough.
                    //

                    //
                    // In case there's no previous nextScore, grab
                    // score as an approximation.
                    //
                    float curNextScore = curPage.getNextScore();
                    if (outLinks.length > 0 && curNextScore == 0.0f) {
                        curNextScore = curPage.getScore();
                    }

                    //
                    // Compute contributions
                    //
                    float contributionForAll = (outLinks.length > 0) ? (curNextScore / outLinks.length) : 0.0f;
                    float contributionForOutlinkers = (targetOutlinkers > 0) ? (curNextScore / targetOutlinkers) : 0.0f;
                    for (int i = 0; i < outLinks.length; i++) {
                        // emit the target URL and the contribution
                        score.setScore(contributionForAll);
                        score.setNextScore(outLinks[i].targetHasOutlink() ? contributionForOutlinkers : 0.0f);
                        scoreWriter.append(outLinks[i].getURL(), score);
                    }

                    if (((curIndex - startIndex) % 5000) == 0) {
                        LOG.info("Pages consumed: " + (curIndex - startIndex) + " (at index " + curIndex + ")");
                    }
                }
            } finally {
                reader.close();
            }
        } finally {
            scoreWriter.close();
        }

        // Now sort the resulting score-edits file
        SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, new UTF8.Comparator(), ScoreValue.class);
        sorter.sort(scoreEdits.getPath() + ".unsorted", scoreEdits.getPath() + ".sorted");
        nfs.delete(new File(scoreEdits.getPath() + ".unsorted"));
    }


    /**
     * This method collates and executes all the instructions
     * computed by the many executors of computeRound().  It
     * figures out what to write by looking at all the flat
     * files found in the distDir.  These files are labelled
     * according to the processes that filled them.  This method
     * will check to make sure all those files are present
     * before starting work.
     *
     * If the processors are distributed, you might have to
     * copy all the instruction files to a single distDir before
     * starting this method.
     *
     * Of course, this method is executed on only one process.
     * It is run last.
     */
    public void completeRound(File distDir, File scoreFile) throws IOException {
        //
        // Load the overall assignment file, so we can
        // see how many processes we have and how many
        // operations each should include
        //
        int numProcesses = 0;
        long totalPages = 0;
        long extent[] = null;
        File overall = new File(distDir, "assignComplete");
        DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(overall)));
        try {
            numProcesses = in.readInt();
            totalPages = in.readLong();
            extent = new long[numProcesses];
            for (int i = 0; i < numProcesses; i++) {
                extent[i] = in.readLong();
            }
        } finally {
            in.close();
            in = null;
        }
        
        //
        // Go through each instructions file we have, and
        // apply each one to the webdb.
        //
        ScoreStats scoreStats = new ScoreStats();
        IWebDBReader reader = new WebDBReader(nfs, dbDir);
        IWebDBWriter writer = new WebDBWriter(nfs, dbDir);
        EditSet editSet = new EditSet(distDir, numProcesses);
        try {
            int count = 0;
            UTF8 curEditURL = new UTF8();
            ScoreValue curContribution = new ScoreValue();
            boolean hasEdit = editSet.next(curEditURL, curContribution);

            //
            // Go through all the Pages, in URL-sort order.
            // We also read from the score-edit file in URL-sort order.
            //
            for (Enumeration e = reader.pages(); e.hasMoreElements(); count++) {
                Page curPage = (Page) e.nextElement();
                if (! hasEdit) {
                    break;
                }

                //
                // Apply the current score-edit to the db item,
                // if appropriate
                //
                int comparison = curPage.getURL().compareTo(curEditURL);
                float newScore = 0.0f, newNextScore = 0.0f;
                if (comparison < 0) {
                    // Fine.  The edit applies to a Page we will
                    // hit later.  Ignore it, and move onto the next
                    // Page.  This should only happen with Pages
                    // that have no incoming links, which are necessarily
                    // special-case Pages.
                    // 
                    // However, that means the Page's score should
                    // be set to the minimum possible, as we have no
                    // incoming links.
                    newScore = (1 - DECAY_VALUE);
                    newNextScore = (1 - DECAY_VALUE);
                } else if (comparison > 0) {
                    // Error!  We should never hit this situation.
                    // It means we have a score-edit for an item
                    // that's not found in the database!
                    throw new IOException("Impossible situation.  There is a score-edit for " + curEditURL + ", which comes after the current Page " + curPage.getURL());
                } else {
                    //
                    // The only really interesting case is when the
                    // score-edit and the curPage are the same.
                    //
                    // Sum all the contributions
                    while (hasEdit && curPage.getURL().compareTo(curEditURL) == 0) {
                        newScore += curContribution.score();
                        newNextScore += curContribution.nextScore();
                        hasEdit = editSet.next(curEditURL, curContribution);
                    }
                    
                    newScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newScore);
                    newNextScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newNextScore);
                }
                
                // Finally, assign it.
                curPage.setScore(newScore, newNextScore);
                writer.addPageWithScore(curPage);
                scoreStats.addScore(newScore);
                if ((count % 5000) == 0) {
                    LOG.info("Pages written: " + count);
                }
            }
            LOG.info("Pages encountered: " + count);
            LOG.info("Target pages from init(): " + totalPages);
        } finally {
            reader.close();
            editSet.close();
            writer.close();
        }

        //
        // Emit the score distribution info
        //
        if (scoreFile.exists()) {
            scoreFile.delete();
        }
        PrintStream pout = new PrintStream(new BufferedOutputStream(nfs.create(scoreFile)));
        try {
            scoreStats.emitDistribution(pout);
        } finally {
            pout.close();
        }

        //
        // Delete all the distributed overhead files
        //
        FileUtil.fullyDelete(nfs, distDir);
    }

    /**
     * Kick off the link analysis.  Submit the locations of the
     * Webdb and the number of iterations.
     *
     * DAT -initRound <n_processes> <dist_dir> <db_dir>
     * DAT -computeRound <process_id> <dist_dir> <db_dir>
     * DAT -completeRound <dist_dir> <db_dir>
     */
    public static void main(String argv[]) throws IOException {
        if (argv.length < 2) {
            System.out.println("usage: java net.nutch.tools.DistributedAnalysisTool (-local | -ndfs <namenode:port>) -initRound|-computeRound|-completeRound (numProcesses | processId) <dist_dir> <db_dir>");
            return;
        }

        String command = null;
        int numProcesses = 0, processId = 0, numIterations = 0;
        File distDir = null, dbDir = null;

        NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, 0);
        for (int i = 0; i < argv.length; i++) {
            if ("-initRound".equals(argv[i])) {
                command = argv[i];
                numProcesses = Integer.parseInt(argv[i+1]);
                distDir = new File(argv[i+2]);
                dbDir = new File(argv[i+3]);
                i+=3;
            } else if ("-computeRound".equals(argv[i])) {
                command = argv[i];
                processId = Integer.parseInt(argv[i+1]);
                distDir = new File(argv[i+2]);
                dbDir = new File(argv[i+3]);
                i+=3;
            } else if ("-completeRound".equals(argv[i])) {
                command = argv[i];
                distDir = new File(argv[i+1]);
                dbDir = new File(argv[i+2]);
                i+=2;
            }
        }

        System.out.println("Started at " + new Date(System.currentTimeMillis()));
        try {
            DistributedAnalysisTool dat = 
                new DistributedAnalysisTool(nfs, dbDir);
            if ("-initRound".equals(command)) {
                dat.initRound(numProcesses, distDir);
            } else if ("-computeRound".equals(command)) {
                dat.computeRound(processId, distDir);
            } else if ("-completeRound".equals(command)) {
                dat.completeRound(distDir, new File(dbDir, "linkstats.txt"));
            } else {
                System.out.println("No directive.");
            }
        } finally {
            System.out.println("Finished at " + new Date(System.currentTimeMillis()));
            nfs.close();
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -