📄 fjtaskrunner.java
字号:
/**
* Check under synch lock if DEQ is really empty when doing pop.
* Return task if not empty, else null.
**/
protected final synchronized FJTask confirmPop(int provisionalTop) {
if (base <= provisionalTop)
return deq[provisionalTop & (deq.length-1)].take();
else { // was empty
/*
Reset DEQ indices to zero whenever it is empty.
This both avoids unnecessary calls to checkOverflow
in push, and helps keep the DEQ from accumulating garbage
*/
top = base = 0;
return null;
}
}
/**
* Take a task from the base of the DEQ.
* Always called by other threads via scan()
**/
protected final synchronized FJTask take() {
/*
Increment base in order to suppress a contending pop
*/
int b = base++;
if (b < top)
return confirmTake(b);
else {
// back out
base = b;
return null;
}
}
/**
* double-check a potential take
**/
protected FJTask confirmTake(int oldBase) {
/*
Use a second (guaranteed uncontended) synch
to serve as a barrier in case JVM does not
properly process read-after-write of 2 volatiles
*/
synchronized(barrier) {
if (oldBase < top) {
/*
We cannot call deq[oldBase].take here because of possible races when
nulling out versus concurrent push operations. Resulting
accumulated garbage is swept out periodically in
checkOverflow, or more typically, just by keeping indices
zero-based when found to be empty in pop, which keeps active
region small and constantly overwritten.
*/
return deq[oldBase & (deq.length-1)].get();
}
else {
base = oldBase;
return null;
}
}
}
/**
* Adjust top and base, and grow DEQ if necessary.
* Called only while DEQ synch lock being held.
* We don't expect this to be called very often. In most
* programs using FJTasks, it is never called.
**/
protected void checkOverflow() {
int t = top;
int b = base;
if (t - b < deq.length-1) { // check if just need an index reset
int newBase = b & (deq.length-1);
int newTop = top & (deq.length-1);
if (newTop < newBase) newTop += deq.length;
top = newTop;
base = newBase;
/*
Null out refs to stolen tasks.
This is the only time we can safely do it.
*/
int i = newBase;
while (i != newTop && deq[i].ref != null) {
deq[i].ref = null;
i = (i - 1) & (deq.length-1);
}
}
else { // grow by doubling array
int newTop = t - b;
int oldcap = deq.length;
int newcap = oldcap * 2;
if (newcap >= MAX_CAPACITY)
throw new Error("FJTask queue maximum capacity exceeded");
VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
// copy in bottom half of new deq with refs from old deq
for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
// fill top half of new deq with new refs
for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
deq = newdeq;
base = 0;
top = newTop;
}
}
/* ------------ Scheduling ------------------- */
/**
* Do all but the pop() part of yield or join, by
* traversing all DEQs in our group looking for a task to
* steal. If none, it checks the entry queue.
* <p>
* Since there are no good, portable alternatives,
* we rely here on a mixture of Thread.yield and priorities
* to reduce wasted spinning, even though these are
* not well defined. We are hoping here that the JVM
* does something sensible.
* @param waitingFor if non-null, the current task being joined
**/
protected void scan(final FJTask waitingFor) {
FJTask task = null;
// to delay lowering priority until first failure to steal
boolean lowered = false;
/*
Circularly traverse from a random start index.
This differs slightly from cilk version that uses a random index
for each attempted steal.
Exhaustive scanning might impede analytic tractablity of
the scheduling policy, but makes it much easier to deal with
startup and shutdown.
*/
FJTaskRunner[] ts = group.getArray();
int idx = victimRNG.nextInt(ts.length);
for (int i = 0; i < ts.length; ++i) {
FJTaskRunner t = ts[idx];
if (++idx >= ts.length) idx = 0; // circularly traverse
if (t != null && t != this) {
if (waitingFor != null && waitingFor.isDone()) {
break;
}
else {
if (COLLECT_STATS) ++scans;
task = t.take();
if (task != null) {
if (COLLECT_STATS) ++steals;
break;
}
else if (isInterrupted()) {
break;
}
else if (!lowered) { // if this is first fail, lower priority
lowered = true;
setPriority(scanPriority);
}
else { // otherwise we are at low priority; just yield
yield();
}
}
}
}
if (task == null) {
if (COLLECT_STATS) ++scans;
task = group.pollEntryQueue();
if (COLLECT_STATS) if (task != null) ++steals;
}
if (lowered) setPriority(runPriority);
if (task != null && !task.isDone()) {
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
}
}
/**
* Same as scan, but called when current thread is idling.
* It repeatedly scans other threads for tasks,
* sleeping while none are available.
* <p>
* This differs from scan mainly in that
* since there is no reason to return to recheck any
* condition, we iterate until a task is found, backing
* off via sleeps if necessary.
**/
protected void scanWhileIdling() {
FJTask task = null;
boolean lowered = false;
long iters = 0;
FJTaskRunner[] ts = group.getArray();
int idx = victimRNG.nextInt(ts.length);
do {
for (int i = 0; i < ts.length; ++i) {
FJTaskRunner t = ts[idx];
if (++idx >= ts.length) idx = 0; // circularly traverse
if (t != null && t != this) {
if (COLLECT_STATS) ++scans;
task = t.take();
if (task != null) {
if (COLLECT_STATS) ++steals;
if (lowered) setPriority(runPriority);
group.setActive(this);
break;
}
}
}
if (task == null) {
if (isInterrupted())
return;
if (COLLECT_STATS) ++scans;
task = group.pollEntryQueue();
if (task != null) {
if (COLLECT_STATS) ++steals;
if (lowered) setPriority(runPriority);
group.setActive(this);
}
else {
++iters;
// Check here for yield vs sleep to avoid entering group synch lock
if (iters >= group.SCANS_PER_SLEEP) {
group.checkActive(this, iters);
if (isInterrupted())
return;
}
else if (!lowered) {
lowered = true;
setPriority(scanPriority);
}
else {
yield();
}
}
}
} while (task == null);
if (!task.isDone()) {
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
}
}
/* ------------ composite operations ------------------- */
/**
* Main runloop
**/
public void run() {
try{
while (!interrupted()) {
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
// inline FJTask.invoke
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
}
}
else
scanWhileIdling();
}
}
finally {
group.setInactive(this);
}
}
/**
* Execute a task in this thread. Generally called when current task
* cannot otherwise continue.
**/
protected final void taskYield() {
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
}
}
else
scan(null);
}
/**
* Process tasks until w is done.
* Equivalent to <code>while(!w.isDone()) taskYield(); </code>
**/
protected final void taskJoin(final FJTask w) {
while (!w.isDone()) {
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
if (task == w) return; // fast exit if we just ran w
}
}
else
scan(w);
}
}
/**
* A specialized expansion of
* <code> w.fork(); invoke(v); w.join(); </code>
**/
protected final void coInvoke(final FJTask w, final FJTask v) {
// inline push
int t = top;
if (t < (base & (deq.length-1)) + deq.length) {
deq[t & (deq.length-1)].put(w);
top = t + 1;
// inline invoke
if (!v.isDone()) {
if (COLLECT_STATS) ++runs;
v.run();
v.setDone();
}
// inline taskJoin
while (!w.isDone()) {
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
if (task == w) return; // fast exit if we just ran w
}
}
else
scan(w);
}
}
else // handle non-inlinable cases
slowCoInvoke(w, v);
}
/**
* Backup to handle noninlinable cases of coInvoke
**/
protected void slowCoInvoke(final FJTask w, final FJTask v) {
push(w); // let push deal with overflow
FJTask.invoke(v);
taskJoin(w);
}
/**
* Array-based version of coInvoke
**/
protected final void coInvoke(FJTask[] tasks) {
int nforks = tasks.length - 1;
// inline bulk push of all but one task
int t = top;
if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
for (int i = 0; i < nforks; ++i) {
deq[t++ & (deq.length-1)].put(tasks[i]);
top = t;
}
// inline invoke of one task
FJTask v = tasks[nforks];
if (!v.isDone()) {
if (COLLECT_STATS) ++runs;
v.run();
v.setDone();
}
// inline taskJoins
for (int i = 0; i < nforks; ++i) {
FJTask w = tasks[i];
while (!w.isDone()) {
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
if (COLLECT_STATS) ++runs;
task.run();
task.setDone();
}
}
else
scan(w);
}
}
}
else // handle non-inlinable cases
slowCoInvoke(tasks);
}
/**
* Backup to handle atypical or noninlinable cases of coInvoke
**/
protected void slowCoInvoke(FJTask[] tasks) {
for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -