📄 fjtaskrunnergroup.java
字号:
* <li> <em>Run</em> The total number of tasks that have been run. * <li> <em>New</em> The number of these tasks that were * taken from either the entry queue or from other * thread queues; that is, the number of tasks run * that were <em>not</em> forked by the thread itself. * <li> <em>Scan</em> The number of times other task * queues or the entry queue were polled for tasks. * </ul> * <li> <em>Execute</em> The total number of tasks entered * (but not necessarily yet run) via execute or invoke. * <li> <em>Time</em> Time in seconds since construction of this * FJTaskRunnerGroup. * <li> <em>Rate</em> The total number of tasks processed * per second across all threads. This * may be useful as a simple throughput indicator * if all processed tasks take approximately the * same time to run. * </ul> * <p> * Cautions: Some statistics are updated and gathered * without synchronization, * so may not be accurate. However, reported counts may be considered * as lower bounds of actual values. * Some values may be zero if classes are compiled * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup * classes can be independently compiled with different values of * COLLECT_STATS.) Also, the counts are maintained as ints so could * overflow in exceptionally long-lived applications. * <p> * These statistics can be useful when tuning algorithms or diagnosing * problems. For example: * <ul> * <li> High numbers of scans may mean that there is insufficient * parallelism to keep threads busy. However, high scan rates * are expected if the number * of Executes is also high or there is a lot of global * synchronization in the application, and the system is not otherwise * busy. Threads may scan * for work hundreds of times upon startup, shutdown, and * global synch points of task sets. * <li> Large imbalances in tasks run across different threads might * just reflect contention with unrelated threads on a system * (possibly including JVM threads such as GC), but may also * indicate some systematic bias in how you generate tasks. * <li> Large task queue capacities may mean that too many tasks are being * generated before they can be run. * Capacities are reported rather than current numbers of tasks * in queues because they are better indicators of the existence * of these kinds of possibly-transient problems. * Queue capacities are * resized on demand from their initial value of 4096 elements, * which is much more than sufficient for the kinds of * applications that this framework is intended to best support. * </ul> **/ public void stats() { long time = System.currentTimeMillis() - initTime; double secs = ((double)time) / 1000.0; long totalRuns = 0; long totalScans = 0; long totalSteals = 0; System.out.print("Thread" + "\tQ Cap" + "\tScans" + "\tNew" + "\tRuns" + "\n"); for (int i = 0; i < threads.length; ++i) { FJTaskRunner t = threads[i]; int truns = t.runs; totalRuns += truns; int tscans = t.scans; totalScans += tscans; int tsteals = t.steals; totalSteals += tsteals; String star = (getActive(t))? "*" : " "; System.out.print("T" + i + star + "\t" + t.deqSize() + "\t" + tscans + "\t" + tsteals + "\t" + truns + "\n"); } System.out.print("Total" + "\t " + "\t" + totalScans + "\t" + totalSteals + "\t" + totalRuns + "\n"); System.out.print("Execute: " + entries); System.out.print("\tTime: " + secs); long rps = 0; if (secs != 0) rps = Math.round((double)(totalRuns) / secs); System.out.println("\tRate: " + rps); } /* ------------ Methods called only by FJTaskRunners ------------- */ /** * Return the array of threads in this group. * Called only by FJTaskRunner.scan(). **/ protected FJTaskRunner[] getArray() { return threads; } /** * Return a task from entry queue, or null if empty. * Called only by FJTaskRunner.scan(). **/ protected FJTask pollEntryQueue() { try { FJTask t = (FJTask)(entryQueue.poll(0)); return t; } catch(InterruptedException ex) { // ignore interrupts Thread.currentThread().interrupt(); return null; } } /** * Return active status of t. * Per-thread active status can only be accessed and * modified via synchronized method here in the group class. **/ protected synchronized boolean getActive(FJTaskRunner t) { return t.active; } /** * Set active status of thread t to true, and notify others * that might be waiting for work. **/ protected synchronized void setActive(FJTaskRunner t) { if (!t.active) { t.active = true; ++activeCount; if (nstarted < threads.length) threads[nstarted++].start(); else notifyAll(); } } /** * Set active status of thread t to false. **/ protected synchronized void setInactive(FJTaskRunner t) { if (t.active) { t.active = false; --activeCount; } } /** * The number of times to scan other threads for tasks * before transitioning to a mode where scans are * interleaved with sleeps (actually timed waits). * Upon transition, sleeps are for duration of * scans / SCANS_PER_SLEEP milliseconds. * <p> * This is not treated as a user-tunable parameter because * good values do not appear to vary much across JVMs or * applications. Its main role is to help avoid some * useless spinning and contention during task startup. **/ static final long SCANS_PER_SLEEP = 15; /** * The maximum time (in msecs) to sleep when a thread is idle, * yet others are not, so may eventually generate work that * the current thread can steal. This value reflects the maximum time * that a thread may sleep when it possibly should not, because there * are other active threads that might generate work. In practice, * designs in which some threads become stalled because others * are running yet not generating tasks are not likely to work * well in this framework anyway, so the exact value does not matter * too much. However, keeping it in the sub-second range does * help smooth out startup and shutdown effects. **/ static final long MAX_SLEEP_TIME = 100; /** * Set active status of thread t to false, and * then wait until: (a) there is a task in the entry * queue, or (b) other threads are active, or (c) the current * thread is interrupted. Upon return, it * is not certain that there will be work available. * The thread must itself check. * <p> * The main underlying reason * for these mechanics is that threads do not * signal each other when they add elements to their queues. * (This would add to task overhead, reduce locality. * and increase contention.) * So we must rely on a tamed form of polling. However, tasks * inserted into the entry queue do result in signals, so * tasks can wait on these if all of them are otherwise idle. **/ protected synchronized void checkActive(FJTaskRunner t, long scans) { setInactive(t); try { // if nothing available, do a hard wait if (activeCount == 0 && entryQueue.peek() == null) { wait(); } else { // If there is possibly some work, // sleep for a while before rechecking long msecs = scans / SCANS_PER_SLEEP; if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep wait(msecs, nsecs); } } catch (InterruptedException ex) { notify(); // avoid lost notifies on interrupts Thread.currentThread().interrupt(); } } /* ------------ Utility methods ------------- */ /** * Start or wake up any threads waiting for work **/ protected synchronized void signalNewTask() { if (COLLECT_STATS) ++entries; if (nstarted < threads.length) threads[nstarted++].start(); else notify(); } /** * Create all FJTaskRunner threads in this group. **/ protected void initializeThreads() { for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); } /** * Wrap wait/notify mechanics around a task so that * invoke() can wait it out **/ protected static final class InvokableFJTask extends FJTask { protected final Runnable wrapped; protected boolean terminated = false; protected InvokableFJTask(Runnable r) { wrapped = r; } public void run() { try { if (wrapped instanceof FJTask) FJTask.invoke((FJTask)(wrapped)); else wrapped.run(); } finally { setTerminated(); } } protected synchronized void setTerminated() { terminated = true; notifyAll(); } protected synchronized void awaitTermination() throws InterruptedException { while (!terminated) wait(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -