⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 concurrentkmeans.java

📁 java 语言编写的kmeans聚类算法源代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:

        // What the object is currently doing. Set to one of the 
        // three codes above.
        private int mDoing = DOING_NOTHING;

        // True if the at least one of the Workers is doing something.
        private boolean mWorking;

        // The executor that runs the Workers.
        // When in multiple processor mode, this is a ThreadPoolExecutor 
        // with a fixed number of threads. In single-processor mode, it's
        // a simple implementation that calls the single worker's run
        // method directly.
        private Executor mExecutor;

        // A Barrier to wait on multiple Workers to finish up the current task.
        // In single-processor mode, there is no need for a barrier, so it
        // is not set.
        private CyclicBarrier mBarrier;

        // The worker objects which implement Runnable.
        private Worker[] mWorkers;

        /**
         * Constructor
         * 
         * @param numThreads the number of worker threads to be used for
         *   the subtasks.
         */
        SubtaskManager(int numThreads) {
            
            if (numThreads <= 0) {
                throw new IllegalArgumentException("number of threads <= 0: "
                        + numThreads);
            }

            int coordCount = mCoordinates.length;

            // There would be no point in having more workers than
            // coordinates, since some of the workers would have nothing
            // to do.
            if (numThreads > coordCount) {
                numThreads = coordCount;
            }

            // Create the workers.
            mWorkers = new Worker[numThreads];

            // To hold the number of coordinates for each worker.
            int[] coordsPerWorker = new int[numThreads];
            
            // Initialize with the base amounts.  
            Arrays.fill(coordsPerWorker, coordCount/numThreads);
            
            // There may be some leftovers, since coordCount may not be
            // evenly divisible by numWorkers. Add a coordinate to each
            // until all are covered.
            int leftOvers = coordCount - numThreads * coordsPerWorker[0];
            for (int i = 0; i < leftOvers; i++) {
                coordsPerWorker[i]++;
            }

            int startCoord = 0;
            // Instantiate the workers.
            for (int i = 0; i < numThreads; i++) {
                // Each worker needs to know its starting coordinate and the number of 
                // coordinates it handles.
                mWorkers[i] = new Worker(startCoord, coordsPerWorker[i]);
                startCoord += coordsPerWorker[i];
            }

            if (numThreads == 1) { // Single-processor mode.
                
                // Create a simple executor that directly calls the single
                // worker's run method.  Do not set the barrier.
                mExecutor = new Executor() {
                    public void execute(Runnable runnable) {
                        if (!Thread.interrupted()) {
                            runnable.run();
                        } else {
                            throw new RejectedExecutionException();
                        }
                    }
                };
                
            } else { // Multiple-processor mode.
                
                // Need the barrier to notify the controlling thread when the
                // Workers are done.
                mBarrier = new CyclicBarrier(numThreads, new Runnable() {
                    public void run() {
                        // Method called after all workers haved called await() on the
                        // barrier.  The call to workersDone() 
                        // unblocks the controlling thread.
                        workersDone();
                    }
                });

                // Set the executor to a fixed thread pool with 
                // threads that do not time out.
                mExecutor = Executors.newFixedThreadPool(numThreads);
            }
        }

        /**
         * Make the cluster assignments.
         * 
         * @return true if nothing went wrong.
         */
        boolean makeAssignments() {
            mDoing = MAKING_ASSIGNMENTS;
            return work();
        }

        /**
         * Compute the distances between the coordinates and those centers with
         * update flags.
         * 
         * @return true if nothing went wrong.
         */
        boolean computeDistances() {
            mDoing = COMPUTING_DISTANCES;
            return work();
        }
        
        /** 
         * Perform the current subtask, waiting until all the workers
         * finish their part of the current task before returning.
         * 
         * @return true if the subtask succeeded.
         */
        private boolean work() {
            boolean ok = false;
            // Set the working flag to true.
            mWorking = true;
            try {
                if (mBarrier != null) {
                    // Resets the barrier so it can be reused if
                    // this is not the first call to this method.
                    mBarrier.reset();
                }
                // Now execute the run methods on the Workers.  
                for (int i = 0; i < mWorkers.length; i++) {
                    mExecutor.execute(mWorkers[i]);
                }
                if (mBarrier != null) {
                    // Block until the workers are done.  The barrier
                    // triggers the unblocking.
                    waitOnWorkers();
                    // If the isBroken() method of the barrier returns false, 
                    // no problems.
                    ok = !mBarrier.isBroken();
                } else {
                    // No barrier, so the run() method of a single worker
                    // was called directly and everything must have worked
                    // if we made it here.
                    ok = true;
                }
            } catch (RejectedExecutionException ree) {
                // Possibly thrown by the executor.
            } finally {
                mWorking = false;
            }
            return ok;
        }

        /**
         * Called from work() to put the controlling thread into
         * wait mode until the barrier calls workersDone().
         */
        private synchronized void waitOnWorkers() {
            // It is possible for the workers to have finished so quickly that
            // workersDone() has already been called.  Since workersDone() sets
            // mWorking to false, check this flag before going into wait mode.
            // Not doing so could result in hanging the SubtaskManager.
            while (mWorking) {
                try {
                    // Blocks until workersDone() is called.
                    wait();
                } catch (InterruptedException ie) {
                    // mBarrier.isBroken() will return true.
                    break;
                }
            }
        }

        /**
         * Notifies the controlling thread that it can come out of
         * wait mode.
         */
        private synchronized void workersDone() {
            // If this gets called before waitOnWorkers(), setting this
            // to false prevents waitOnWorkers() from entering a 
            // permanent wait.
            mWorking = false;
            notifyAll();
        }

        /**
         * Shutdown the thread pool when k-means is finished.
         */
        void shutdown() {
            if (mExecutor instanceof ThreadPoolExecutor) {
                // This terminates the threads in the thread pool.
                ((ThreadPoolExecutor) mExecutor).shutdownNow();
            }
        }

        /** 
         * Returns the number of cluster assignment changes made in the
         * previous call to makeAssignments().
         */
        int numberOfMoves() {
            // Sum the contributions from the workers.
            int moves = 0;
            for (int i=0; i<mWorkers.length; i++) {
                moves += mWorkers[i].numberOfMoves();
            }
            return moves;
        }

        /**
         * The class which does the hard work of the subtasks.
         */
        private class Worker implements Runnable {

            // Defines range of coordinates to cover.
            private int mStartCoord, mNumCoords;

            // Number of moves made by this worker in the last call
            // to workerMakeAssignments().  The SubtaskManager totals up
            // this value from all the workers in numberOfMoves().
            private int mMoves;

            /**
             * Constructor
             * 
             * @param startCoord index of the first coordinate covered by
             *   this Worker.
             * @param numCoords the number of coordinates covered.
             */
            Worker(int startCoord, int numCoords) {
                mStartCoord = startCoord;
                mNumCoords = numCoords;
            }

            /**
             * Returns the number of moves this worker made in the last
             * execution of workerMakeAssignments()
             */
            int numberOfMoves() {
                return mMoves;
            }
            
            /**
             * The run method.  It accesses the SubtaskManager field mDoing
             * to determine what subtask to perform.
             */
            public void run() {

                try {
                    switch (mDoing) {
                    case COMPUTING_DISTANCES:
                        workerComputeDistances();
                        break;
                    case MAKING_ASSIGNMENTS:
                        workerMakeAssignments();
                        break;
                    }
                } finally {
                    // If there's a barrier, call its await() method.  To ensure it
                    // gets done, it's placed in the finally clause.
                    if (mBarrier != null) {
                        try {
                            mBarrier.await();
                        // barrier.isBroken() will return true if either of these
                        // exceptions happens, so the SubtaskManager will detect
                        // the problem.
                        } catch (InterruptedException ex) {
                        } catch (BrokenBarrierException ex) {
                        }
                    }
                }
                
            }

            /**
             * Compute the distances for the covered coordinates
             * to the updated centers.
             */
            private void workerComputeDistances() {
                int lim = mStartCoord + mNumCoords;
                for (int i = mStartCoord; i < lim; i++) {
                    int numClusters = mProtoClusters.length;
                    for (int c = 0; c < numClusters; c++) {
                        ProtoCluster cluster = mProtoClusters[c];
                        if (cluster.getConsiderForAssignment() && cluster.needsUpdate()) {
                            mDistanceCache[i][c] = distance(mCoordinates[i], cluster.getCenter()); 
                        }
                    }
                } 
            }

            /**
             * Assign each covered coordinate to the nearest cluster.
             */
            private void workerMakeAssignments() {
                mMoves = 0;
                int lim = mStartCoord + mNumCoords;
                for (int i = mStartCoord; i < lim; i++) {
                    int c = nearestCluster(i);
                    mProtoClusters[c].add(i);
                    if (mClusterAssignments[i] != c) {
                        mClusterAssignments[i] = c;
                        mMoves++;
                    }
                }
            }

        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -