📄 fjtaskrunner.java
字号:
/** * Check under synch lock if DEQ is really empty when doing pop. * Return task if not empty, else null. **/ protected final synchronized FJTask confirmPop(int provisionalTop) { if (base <= provisionalTop) return deq[provisionalTop & (deq.length-1)].take(); else { // was empty /* Reset DEQ indices to zero whenever it is empty. This both avoids unnecessary calls to checkOverflow in push, and helps keep the DEQ from accumulating garbage */ top = base = 0; return null; } } /** * Take a task from the base of the DEQ. * Always called by other threads via scan() **/ protected final synchronized FJTask take() { /* Increment base in order to suppress a contending pop */ int b = base++; if (b < top) return confirmTake(b); else { // back out base = b; return null; } } /** * double-check a potential take **/ protected FJTask confirmTake(int oldBase) { /* Use a second (guaranteed uncontended) synch to serve as a barrier in case JVM does not properly process read-after-write of 2 volatiles */ synchronized(barrier) { if (oldBase < top) { /* We cannot call deq[oldBase].take here because of possible races when nulling out versus concurrent push operations. Resulting accumulated garbage is swept out periodically in checkOverflow, or more typically, just by keeping indices zero-based when found to be empty in pop, which keeps active region small and constantly overwritten. */ return deq[oldBase & (deq.length-1)].get(); } else { base = oldBase; return null; } } } /** * Adjust top and base, and grow DEQ if necessary. * Called only while DEQ synch lock being held. * We don't expect this to be called very often. In most * programs using FJTasks, it is never called. **/ protected void checkOverflow() { int t = top; int b = base; if (t - b < deq.length-1) { // check if just need an index reset int newBase = b & (deq.length-1); int newTop = top & (deq.length-1); if (newTop < newBase) newTop += deq.length; top = newTop; base = newBase; /* Null out refs to stolen tasks. This is the only time we can safely do it. */ int i = newBase; while (i != newTop && deq[i].ref != null) { deq[i].ref = null; i = (i - 1) & (deq.length-1); } } else { // grow by doubling array int newTop = t - b; int oldcap = deq.length; int newcap = oldcap * 2; if (newcap >= MAX_CAPACITY) throw new Error("FJTask queue maximum capacity exceeded"); VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap]; // copy in bottom half of new deq with refs from old deq for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)]; // fill top half of new deq with new refs for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef(); deq = newdeq; base = 0; top = newTop; } } /* ------------ Scheduling ------------------- */ /** * Do all but the pop() part of yield or join, by * traversing all DEQs in our group looking for a task to * steal. If none, it checks the entry queue. * <p> * Since there are no good, portable alternatives, * we rely here on a mixture of Thread.yield and priorities * to reduce wasted spinning, even though these are * not well defined. We are hoping here that the JVM * does something sensible. * @param waitingFor if non-null, the current task being joined **/ protected void scan(final FJTask waitingFor) { FJTask task = null; // to delay lowering priority until first failure to steal boolean lowered = false; /* Circularly traverse from a random start index. This differs slightly from cilk version that uses a random index for each attempted steal. Exhaustive scanning might impede analytic tractablity of the scheduling policy, but makes it much easier to deal with startup and shutdown. */ FJTaskRunner[] ts = group.getArray(); int idx = victimRNG.nextInt(ts.length); for (int i = 0; i < ts.length; ++i) { FJTaskRunner t = ts[idx]; if (++idx >= ts.length) idx = 0; // circularly traverse if (t != null && t != this) { if (waitingFor != null && waitingFor.isDone()) { break; } else { if (COLLECT_STATS) ++scans; task = t.take(); if (task != null) { if (COLLECT_STATS) ++steals; break; } else if (isInterrupted()) { break; } else if (!lowered) { // if this is first fail, lower priority lowered = true; setPriority(scanPriority); } else { // otherwise we are at low priority; just yield yield(); } } } } if (task == null) { if (COLLECT_STATS) ++scans; task = group.pollEntryQueue(); if (COLLECT_STATS) if (task != null) ++steals; } if (lowered) setPriority(runPriority); if (task != null && !task.isDone()) { if (COLLECT_STATS) ++runs; task.run(); task.setDone(); } } /** * Same as scan, but called when current thread is idling. * It repeatedly scans other threads for tasks, * sleeping while none are available. * <p> * This differs from scan mainly in that * since there is no reason to return to recheck any * condition, we iterate until a task is found, backing * off via sleeps if necessary. **/ protected void scanWhileIdling() { FJTask task = null; boolean lowered = false; long iters = 0; FJTaskRunner[] ts = group.getArray(); int idx = victimRNG.nextInt(ts.length); do { for (int i = 0; i < ts.length; ++i) { FJTaskRunner t = ts[idx]; if (++idx >= ts.length) idx = 0; // circularly traverse if (t != null && t != this) { if (COLLECT_STATS) ++scans; task = t.take(); if (task != null) { if (COLLECT_STATS) ++steals; if (lowered) setPriority(runPriority); group.setActive(this); break; } } } if (task == null) { if (isInterrupted()) return; if (COLLECT_STATS) ++scans; task = group.pollEntryQueue(); if (task != null) { if (COLLECT_STATS) ++steals; if (lowered) setPriority(runPriority); group.setActive(this); } else { ++iters; // Check here for yield vs sleep to avoid entering group synch lock if (iters >= group.SCANS_PER_SLEEP) { group.checkActive(this, iters); if (isInterrupted()) return; } else if (!lowered) { lowered = true; setPriority(scanPriority); } else { yield(); } } } } while (task == null); if (!task.isDone()) { if (COLLECT_STATS) ++runs; task.run(); task.setDone(); } } /* ------------ composite operations ------------------- */ /** * Main runloop **/ public void run() { try{ while (!interrupted()) { FJTask task = pop(); if (task != null) { if (!task.isDone()) { // inline FJTask.invoke if (COLLECT_STATS) ++runs; task.run(); task.setDone(); } } else scanWhileIdling(); } } finally { group.setInactive(this); } } /** * Execute a task in this thread. Generally called when current task * cannot otherwise continue. **/ protected final void taskYield() { FJTask task = pop(); if (task != null) { if (!task.isDone()) { if (COLLECT_STATS) ++runs; task.run(); task.setDone(); } } else scan(null); } /** * Process tasks until w is done. * Equivalent to <code>while(!w.isDone()) taskYield(); </code> **/ protected final void taskJoin(final FJTask w) { while (!w.isDone()) { FJTask task = pop(); if (task != null) { if (!task.isDone()) { if (COLLECT_STATS) ++runs; task.run(); task.setDone(); if (task == w) return; // fast exit if we just ran w } } else scan(w); } } /** * A specialized expansion of * <code> w.fork(); invoke(v); w.join(); </code> **/ protected final void coInvoke(final FJTask w, final FJTask v) { // inline push int t = top; if (t < (base & (deq.length-1)) + deq.length) { deq[t & (deq.length-1)].put(w); top = t + 1; // inline invoke if (!v.isDone()) { if (COLLECT_STATS) ++runs; v.run(); v.setDone(); } // inline taskJoin while (!w.isDone()) { FJTask task = pop(); if (task != null) { if (!task.isDone()) { if (COLLECT_STATS) ++runs; task.run(); task.setDone(); if (task == w) return; // fast exit if we just ran w } } else scan(w); } } else // handle non-inlinable cases slowCoInvoke(w, v); } /** * Backup to handle noninlinable cases of coInvoke **/ protected void slowCoInvoke(final FJTask w, final FJTask v) { push(w); // let push deal with overflow FJTask.invoke(v); taskJoin(w); } /** * Array-based version of coInvoke **/ protected final void coInvoke(FJTask[] tasks) { int nforks = tasks.length - 1; // inline bulk push of all but one task int t = top; if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) { for (int i = 0; i < nforks; ++i) { deq[t++ & (deq.length-1)].put(tasks[i]); top = t; } // inline invoke of one task FJTask v = tasks[nforks]; if (!v.isDone()) { if (COLLECT_STATS) ++runs; v.run(); v.setDone(); } // inline taskJoins for (int i = 0; i < nforks; ++i) { FJTask w = tasks[i]; while (!w.isDone()) { FJTask task = pop(); if (task != null) { if (!task.isDone()) { if (COLLECT_STATS) ++runs; task.run(); task.setDone(); } } else scan(w); } } } else // handle non-inlinable cases slowCoInvoke(tasks); } /** * Backup to handle atypical or noninlinable cases of coInvoke **/ protected void slowCoInvoke(FJTask[] tasks) { for (int i = 0; i < tasks.length; ++i) push(tasks[i]); for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -