📄 synchronizationtimer.java
字号:
if (chanCls == null) {
RNG shared = (RNG)(cls.newInstance());
for (int k = 0; k < nthreads; ++k) {
RNG pri = (RNG)(cls.newInstance());
TestLoop l = new TestLoop(shared, pri, pshr, iters, barrier);
Threads.pool.execute(l.testLoop());
}
}
else {
Channel shared = (Channel)(chanCls.newInstance());
if (nthreads == 1) {
ChanRNG single = (ChanRNG)(cls.newInstance());
single.setSingle(true);
PCTestLoop l = new PCTestLoop(single.getDelegate(), single, pshr,
iters, barrier,
shared, shared);
Threads.pool.execute(l.testLoop(true));
}
else if (nthreads % 2 != 0)
throw new Error("Must have even number of threads!");
else {
int npairs = nthreads / 2;
for (int k = 0; k < npairs; ++k) {
ChanRNG t = (ChanRNG)(cls.newInstance());
t.setSingle(false);
Channel chan = (Channel)(chanCls.newInstance());
PCTestLoop l = new PCTestLoop(t.getDelegate(), t, pshr,
iters, barrier,
shared, chan);
Threads.pool.execute(l.testLoop(false));
Threads.pool.execute(l.testLoop(true));
}
}
}
if (echoToSystemOut.get()) {
System.out.print(
entry.name + " " +
nthreads + "T " +
pshr + "S " +
RNG.computeLoops.get() + "I " +
RNG.syncMode.get() + "Lm " +
RNG.timeout.get() + "TO " +
RNG.producerMode.get() + "Pm " +
RNG.consumerMode.get() + "Cm " +
RNG.bias.get() + "B " +
DefaultChannelCapacity.get() + "C " +
RNG.exchangeParties.get() + "Xp " +
RNG.itersPerBarrier.get() + "Ib : "
);
}
}
// Uncomment if AWT doesn't update right
// Thread.sleep(100);
barrier.barrier(); // start
barrier.barrier(); // stop
long tm = timer.getTime();
long totalIters = nthreads * iters;
double dns = tm * 1000.0 * PRECISION / totalIters;
long ns = Math.round(dns);
setTime(ns, clsIdx, nthreadsIdx);
if (echoToSystemOut.get()) {
System.out.println(formatTime(ns, true));
}
}
catch (BrokenBarrierException ex) {
wasInterrupted = true;
}
catch (InterruptedException ex) {
wasInterrupted = true;
Thread.currentThread().interrupt();
}
catch (Exception ex) {
ex.printStackTrace();
System.out.println("Construction Exception?");
System.exit(-1);
}
finally {
final boolean clear = wasInterrupted;
SwingUtilities.invokeLater(new Runnable() {
public void run() {
if (clear) cell.setText("");
cell.setForeground(oldfg);
cell.repaint();
}
});
Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
endOneTest();
}
}
}
}
class Threads implements ThreadFactory {
static final SynchronizedInt activeThreads = new SynchronizedInt(0);
static final Threads factory = new Threads();
static final PooledExecutor pool = new PooledExecutor();
static {
pool.setKeepAliveTime(10000);
pool.setThreadFactory(factory);
}
static class MyThread extends Thread {
public MyThread(Runnable cmd) {
super(cmd);
}
public void run() {
activeThreads.increment();
try {
super.run();
}
finally {
activeThreads.decrement();
}
}
}
public Thread newThread(Runnable cmd) {
return new MyThread(cmd);
}
}
class TestLoop {
final RNG shared;
final RNG primary;
final int iters;
final Fraction pshared;
final CyclicBarrier barrier;
final boolean[] useShared;
final int firstidx;
public TestLoop(RNG sh, RNG pri, Fraction pshr, int it, CyclicBarrier br) {
shared = sh;
primary = pri;
pshared = pshr;
iters = it;
barrier = br;
firstidx = (int)(primary.get());
int num = (int)(pshared.numerator());
int denom = (int)(pshared.denominator());
if (num == 0 || primary == shared) {
useShared = new boolean[1];
useShared[0] = false;
}
else if (num >= denom) {
useShared = new boolean[1];
useShared[0] = true;
}
else {
// create bool array and randomize it.
// This ensures that always same number of shared calls.
// denom slots is too few. iters is too many. an arbitrary compromise is:
int xfactor = 1024 / denom;
if (xfactor < 1) xfactor = 1;
useShared = new boolean[denom * xfactor];
for (int i = 0; i < num * xfactor; ++i)
useShared[i] = true;
for (int i = num * xfactor; i < denom * xfactor; ++i)
useShared[i] = false;
for (int i = 1; i < useShared.length; ++i) {
int j = ((int) (shared.next() & 0x7FFFFFFF)) % (i + 1);
boolean tmp = useShared[i];
useShared[i] = useShared[j];
useShared[j] = tmp;
}
}
}
public Runnable testLoop() {
return new Runnable() {
public void run() {
int itersPerBarrier = RNG.itersPerBarrier.get();
try {
int delta = -1;
if (primary.getClass().equals(PrioritySemRNG.class)) {
delta = 2 - (int)((primary.get() % 5));
}
Thread.currentThread().setPriority(Thread.NORM_PRIORITY+delta);
int nshared = (int)(iters * pshared.asDouble());
int nprimary = iters - nshared;
int idx = firstidx;
barrier.barrier();
for (int i = iters; i > 0; --i) {
++idx;
if (i % itersPerBarrier == 0)
primary.exchange();
else {
RNG r;
if (nshared > 0 && useShared[idx % useShared.length]) {
--nshared;
r = shared;
}
else {
--nprimary;
r = primary;
}
long rnd = r.next();
if (rnd % 2 == 0 && Thread.currentThread().isInterrupted())
break;
}
}
}
catch (BrokenBarrierException ex) {
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
finally {
try {
barrier.barrier();
}
catch (BrokenBarrierException ex) {
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
finally {
Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
}
}
}
};
}
}
class PCTestLoop extends TestLoop {
final Channel primaryChannel;
final Channel sharedChannel;
public PCTestLoop(RNG sh, RNG pri, Fraction pshr, int it,
CyclicBarrier br, Channel shChan, Channel priChan) {
super(sh, pri, pshr, it, br);
sharedChannel = shChan;
primaryChannel = priChan;
}
public Runnable testLoop(final boolean isProducer) {
return new Runnable() {
public void run() {
int delta = -1;
Thread.currentThread().setPriority(Thread.NORM_PRIORITY+delta);
int itersPerBarrier = RNG.itersPerBarrier.get();
try {
int nshared = (int)(iters * pshared.asDouble());
int nprimary = iters - nshared;
int idx = firstidx;
barrier.barrier();
ChanRNG target = (ChanRNG)(primary);
for (int i = iters; i > 0; --i) {
++idx;
if (i % itersPerBarrier == 0)
primary.exchange();
else {
Channel c;
if (nshared > 0 && useShared[idx % useShared.length]) {
--nshared;
c = sharedChannel;
}
else {
--nprimary;
c = primaryChannel;
}
long rnd;
if (isProducer)
rnd = target.producerNext(c);
else
rnd = target.consumerNext(c);
if (rnd % 2 == 0 && Thread.currentThread().isInterrupted())
break;
}
}
}
catch (BrokenBarrierException ex) {
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
finally {
try {
barrier.barrier();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
}
finally {
Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
}
}
}
};
}
}
// -------------------------------------------------------------
abstract class RNG implements Serializable, Comparable {
static final int firstSeed = 4321;
static final int rmod = 2147483647;
static final int rmul = 16807;
static int lastSeed = firstSeed;
static final int smod = 32749;
static final int smul = 3125;
static final Object constructionLock = RNG.class;
// Use construction lock for all params to disable
// changes in midst of construction of test objects.
static final SynchronizedInt computeLoops =
new SynchronizedInt(16, constructionLock);
static final SynchronizedInt syncMode =
new SynchronizedInt(0, constructionLock);
static final SynchronizedInt producerMode =
new SynchronizedInt(0, constructionLock);
static final SynchronizedInt consumerMode =
new SynchronizedInt(0, constructionLock);
static final SynchronizedInt bias =
new SynchronizedInt(0, constructionLock);
static final SynchronizedLong timeout =
new SynchronizedLong(100, constructionLock);
static final SynchronizedInt exchangeParties =
new SynchronizedInt(1, constructionLock);
static final SynchronizedInt sequenceNumber =
new SynchronizedInt(0, constructionLock);
static final SynchronizedInt itersPerBarrier =
new SynchronizedInt(0, constructionLock);
static Rendezvous[] exchangers_;
static void reset(int nthreads) {
synchronized(constructionLock) {
sequenceNumber.set(-1);
int parties = exchangeParties.get();
if (nthreads < parties) parties = nthreads;
if (nthreads % parties != 0)
throw new Error("need even multiple of parties");
exchangers_ = new Rendezvous[nthreads / parties];
for (int i = 0; i < exchangers_.length; ++i) {
exchangers_[i] = new Rendezvous(parties);
}
}
}
static long nextSeed() {
synchronized(constructionLock) {
long s = lastSeed;
lastSeed = (lastSeed * smul) % smod;
if (lastSeed == 0)
lastSeed = (int)(System.currentTimeMillis());
return s;
}
}
final int cloops = computeLoops.get();
final int pcBias = bias.get();
final int smode = syncMode.get();
final int pmode = producerMode.get();
final int cmode = consumerMode.get();
final long waitTime = timeout.get();
Rendezvous exchanger_ = null;
synchronized Rendezvous getExchanger() {
if (exchanger_ == null) {
synchronized (constructionLock) {
int idx = sequenceNumber.increment();
exchanger_ = exchangers_[idx % exchangers_.length];
}
}
return exchanger_;
}
public void exchange() throws InterruptedException {
Rendezvous ex = getExchanger();
Runnable r = (Runnable)(ex.rendezvous(new UpdateCommand(this)));
if (r != null) r.run();
}
public int compareTo(Object other) {
int h1 = hashCode();
int h2 = other.hashCode();
if (h1 < h2) return -1;
else if (h1 > h2) return 1;
else return 0;
}
protected final long compute(long l) {
int loops = (int)((l & 0x7FFFFFFF) % (cloops * 2)) + 1;
for (int i = 0; i < loops; ++i) l = (l * rmul) % rmod;
return (l == 0)? firstSeed : l;
}
abstract protected void set(long l);
abstract protected long internalGet();
abstract protected void internalUpdate();
public long get() { return internalGet(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -