📄 concurrentkmeans.java
字号:
// What the object is currently doing. Set to one of the
// three codes above.
private int mDoing = DOING_NOTHING;
// True if the at least one of the Workers is doing something.
private boolean mWorking;
// The executor that runs the Workers.
// When in multiple processor mode, this is a ThreadPoolExecutor
// with a fixed number of threads. In single-processor mode, it's
// a simple implementation that calls the single worker's run
// method directly.
private Executor mExecutor;
// A Barrier to wait on multiple Workers to finish up the current task.
// In single-processor mode, there is no need for a barrier, so it
// is not set.
private CyclicBarrier mBarrier;
// The worker objects which implement Runnable.
private Worker[] mWorkers;
/**
* Constructor
*
* @param numThreads the number of worker threads to be used for
* the subtasks.
*/
SubtaskManager(int numThreads) {
if (numThreads <= 0) {
throw new IllegalArgumentException("number of threads <= 0: "
+ numThreads);
}
int coordCount = mCoordinates.length;
// There would be no point in having more workers than
// coordinates, since some of the workers would have nothing
// to do.
if (numThreads > coordCount) {
numThreads = coordCount;
}
// Create the workers.
mWorkers = new Worker[numThreads];
// To hold the number of coordinates for each worker.
int[] coordsPerWorker = new int[numThreads];
// Initialize with the base amounts.
Arrays.fill(coordsPerWorker, coordCount/numThreads);
// There may be some leftovers, since coordCount may not be
// evenly divisible by numWorkers. Add a coordinate to each
// until all are covered.
int leftOvers = coordCount - numThreads * coordsPerWorker[0];
for (int i = 0; i < leftOvers; i++) {
coordsPerWorker[i]++;
}
int startCoord = 0;
// Instantiate the workers.
for (int i = 0; i < numThreads; i++) {
// Each worker needs to know its starting coordinate and the number of
// coordinates it handles.
mWorkers[i] = new Worker(startCoord, coordsPerWorker[i]);
startCoord += coordsPerWorker[i];
}
if (numThreads == 1) { // Single-processor mode.
// Create a simple executor that directly calls the single
// worker's run method. Do not set the barrier.
mExecutor = new Executor() {
public void execute(Runnable runnable) {
if (!Thread.interrupted()) {
runnable.run();
} else {
throw new RejectedExecutionException();
}
}
};
} else { // Multiple-processor mode.
// Need the barrier to notify the controlling thread when the
// Workers are done.
mBarrier = new CyclicBarrier(numThreads, new Runnable() {
public void run() {
// Method called after all workers haved called await() on the
// barrier. The call to workersDone()
// unblocks the controlling thread.
workersDone();
}
});
// Set the executor to a fixed thread pool with
// threads that do not time out.
mExecutor = Executors.newFixedThreadPool(numThreads);
}
}
/**
* Make the cluster assignments.
*
* @return true if nothing went wrong.
*/
boolean makeAssignments() {
mDoing = MAKING_ASSIGNMENTS;
return work();
}
/**
* Compute the distances between the coordinates and those centers with
* update flags.
*
* @return true if nothing went wrong.
*/
boolean computeDistances() {
mDoing = COMPUTING_DISTANCES;
return work();
}
/**
* Perform the current subtask, waiting until all the workers
* finish their part of the current task before returning.
*
* @return true if the subtask succeeded.
*/
private boolean work() {
boolean ok = false;
// Set the working flag to true.
mWorking = true;
try {
if (mBarrier != null) {
// Resets the barrier so it can be reused if
// this is not the first call to this method.
mBarrier.reset();
}
// Now execute the run methods on the Workers.
for (int i = 0; i < mWorkers.length; i++) {
mExecutor.execute(mWorkers[i]);
}
if (mBarrier != null) {
// Block until the workers are done. The barrier
// triggers the unblocking.
waitOnWorkers();
// If the isBroken() method of the barrier returns false,
// no problems.
ok = !mBarrier.isBroken();
} else {
// No barrier, so the run() method of a single worker
// was called directly and everything must have worked
// if we made it here.
ok = true;
}
} catch (RejectedExecutionException ree) {
// Possibly thrown by the executor.
} finally {
mWorking = false;
}
return ok;
}
/**
* Called from work() to put the controlling thread into
* wait mode until the barrier calls workersDone().
*/
private synchronized void waitOnWorkers() {
// It is possible for the workers to have finished so quickly that
// workersDone() has already been called. Since workersDone() sets
// mWorking to false, check this flag before going into wait mode.
// Not doing so could result in hanging the SubtaskManager.
while (mWorking) {
try {
// Blocks until workersDone() is called.
wait();
} catch (InterruptedException ie) {
// mBarrier.isBroken() will return true.
break;
}
}
}
/**
* Notifies the controlling thread that it can come out of
* wait mode.
*/
private synchronized void workersDone() {
// If this gets called before waitOnWorkers(), setting this
// to false prevents waitOnWorkers() from entering a
// permanent wait.
mWorking = false;
notifyAll();
}
/**
* Shutdown the thread pool when k-means is finished.
*/
void shutdown() {
if (mExecutor instanceof ThreadPoolExecutor) {
// This terminates the threads in the thread pool.
((ThreadPoolExecutor) mExecutor).shutdownNow();
}
}
/**
* Returns the number of cluster assignment changes made in the
* previous call to makeAssignments().
*/
int numberOfMoves() {
// Sum the contributions from the workers.
int moves = 0;
for (int i=0; i<mWorkers.length; i++) {
moves += mWorkers[i].numberOfMoves();
}
return moves;
}
/**
* The class which does the hard work of the subtasks.
*/
private class Worker implements Runnable {
// Defines range of coordinates to cover.
private int mStartCoord, mNumCoords;
// Number of moves made by this worker in the last call
// to workerMakeAssignments(). The SubtaskManager totals up
// this value from all the workers in numberOfMoves().
private int mMoves;
/**
* Constructor
*
* @param startCoord index of the first coordinate covered by
* this Worker.
* @param numCoords the number of coordinates covered.
*/
Worker(int startCoord, int numCoords) {
mStartCoord = startCoord;
mNumCoords = numCoords;
}
/**
* Returns the number of moves this worker made in the last
* execution of workerMakeAssignments()
*/
int numberOfMoves() {
return mMoves;
}
/**
* The run method. It accesses the SubtaskManager field mDoing
* to determine what subtask to perform.
*/
public void run() {
try {
switch (mDoing) {
case COMPUTING_DISTANCES:
workerComputeDistances();
break;
case MAKING_ASSIGNMENTS:
workerMakeAssignments();
break;
}
} finally {
// If there's a barrier, call its await() method. To ensure it
// gets done, it's placed in the finally clause.
if (mBarrier != null) {
try {
mBarrier.await();
// barrier.isBroken() will return true if either of these
// exceptions happens, so the SubtaskManager will detect
// the problem.
} catch (InterruptedException ex) {
} catch (BrokenBarrierException ex) {
}
}
}
}
/**
* Compute the distances for the covered coordinates
* to the updated centers.
*/
private void workerComputeDistances() {
int lim = mStartCoord + mNumCoords;
for (int i = mStartCoord; i < lim; i++) {
int numClusters = mProtoClusters.length;
for (int c = 0; c < numClusters; c++) {
ProtoCluster cluster = mProtoClusters[c];
if (cluster.getConsiderForAssignment() && cluster.needsUpdate()) {
mDistanceCache[i][c] = distance(mCoordinates[i], cluster.getCenter());
}
}
}
}
/**
* Assign each covered coordinate to the nearest cluster.
*/
private void workerMakeAssignments() {
mMoves = 0;
int lim = mStartCoord + mNumCoords;
for (int i = mStartCoord; i < lim; i++) {
int c = nearestCluster(i);
mProtoClusters[c].add(i);
if (mClusterAssignments[i] != c) {
mClusterAssignments[i] = c;
mMoves++;
}
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -