concurrentqueueloops.java
来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 134 行
JAVA
134 行
/*
* @test %I% %E%
* @bug 4486658
* @compile -source 1.5 ConcurrentQueueLoops.java
* @run main/timeout=230 ConcurrentQueueLoops
* @summary Checks that a set of threads can repeatedly get and modify items
*/
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain. Use, modify, and
* redistribute this code in any way without acknowledgement.
*/
import java.util.*;
import edu.emory.mathcs.backport.java.util.*;
import edu.emory.mathcs.backport.java.util.concurrent.*;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.*;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.*;
public class ConcurrentQueueLoops {
static final ExecutorService pool = Executors.newCachedThreadPool();
static AtomicInteger totalItems;
static boolean print = false;
public static void main(String[] args) throws Exception {
// int maxStages = 8;
// int items = 100000;
//
// Class klass = null;
// if (args.length > 0) {
// try {
// klass = Class.forName(args[0]);
// } catch(ClassNotFoundException e) {
// throw new RuntimeException("Class " + args[0] + " not found.");
// }
// }
// else
// klass = edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue.class;
//
// if (args.length > 1)
// maxStages = Integer.parseInt(args[1]);
//
// System.out.print("Class: " + klass.getName());
// System.out.println(" stages: " + maxStages);
//
// print = false;
// System.out.println("Warmup...");
// oneRun(klass, 1, items);
// Thread.sleep(100);
// oneRun(klass, 1, items);
// Thread.sleep(100);
// print = true;
//
// for (int i = 1; i <= maxStages; i += (i+1) >>> 1) {
// oneRun(klass, i, items);
// }
// pool.shutdown();
}
static class Stage implements Callable {
final Queue queue;
final CyclicBarrier barrier;
int items;
Stage (Queue q, CyclicBarrier b, int items) {
queue = q;
barrier = b;
this.items = items;
}
public Object call() {
// Repeatedly take something from queue if possible,
// transform it, and put back in.
try {
barrier.await();
int l = (int)Utils.nanoTime();
int takes = 0;
int seq = l;
for (;;) {
Integer item = (Integer)queue.poll();
if (item != null) {
++takes;
l = LoopHelpers.compute2(item.intValue());
}
else if (takes != 0) {
totalItems.getAndAdd(-takes);
takes = 0;
}
else if (totalItems.get() <= 0)
break;
l = LoopHelpers.compute1(l);
if (items > 0) {
--items;
while (!queue.offer(new Integer(l^seq++))) ;
}
else if ( (l & (3 << 5)) == 0) // spinwait
Thread.sleep(1);
}
return new Integer(l);
}
catch (Exception ie) {
ie.printStackTrace();
throw new Error("Call loop failed");
}
}
}
static void oneRun(Class klass, int n, int items) throws Exception {
Queue q = (Queue)klass.newInstance();
LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
CyclicBarrier barrier = new CyclicBarrier(n + 1, timer);
totalItems = new AtomicInteger(n * items);
ArrayList results = new ArrayList(n);
for (int i = 0; i < n; ++i)
results.add(pool.submit(new Stage(q, barrier, items)));
if (print)
System.out.print("Threads: " + n + "\t:");
barrier.await();
int total = 0;
for (int i = 0; i < n; ++i) {
Future f = (Future)results.get(i);
Integer r = (Integer)f.get();
total += r.intValue();
}
long endTime = Utils.nanoTime();
long time = endTime - timer.startTime;
if (print)
System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item");
if (total == 0) // avoid overoptimization
System.out.println("useless result: " + total);
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?