📄 fjtaskrunnergroup.java
字号:
* <li> <em>Run</em> The total number of tasks that have been run.
* <li> <em>New</em> The number of these tasks that were
* taken from either the entry queue or from other
* thread queues; that is, the number of tasks run
* that were <em>not</em> forked by the thread itself.
* <li> <em>Scan</em> The number of times other task
* queues or the entry queue were polled for tasks.
* </ul>
* <li> <em>Execute</em> The total number of tasks entered
* (but not necessarily yet run) via execute or invoke.
* <li> <em>Time</em> Time in seconds since construction of this
* FJTaskRunnerGroup.
* <li> <em>Rate</em> The total number of tasks processed
* per second across all threads. This
* may be useful as a simple throughput indicator
* if all processed tasks take approximately the
* same time to run.
* </ul>
* <p>
* Cautions: Some statistics are updated and gathered
* without synchronization,
* so may not be accurate. However, reported counts may be considered
* as lower bounds of actual values.
* Some values may be zero if classes are compiled
* with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
* classes can be independently compiled with different values of
* COLLECT_STATS.) Also, the counts are maintained as ints so could
* overflow in exceptionally long-lived applications.
* <p>
* These statistics can be useful when tuning algorithms or diagnosing
* problems. For example:
* <ul>
* <li> High numbers of scans may mean that there is insufficient
* parallelism to keep threads busy. However, high scan rates
* are expected if the number
* of Executes is also high or there is a lot of global
* synchronization in the application, and the system is not otherwise
* busy. Threads may scan
* for work hundreds of times upon startup, shutdown, and
* global synch points of task sets.
* <li> Large imbalances in tasks run across different threads might
* just reflect contention with unrelated threads on a system
* (possibly including JVM threads such as GC), but may also
* indicate some systematic bias in how you generate tasks.
* <li> Large task queue capacities may mean that too many tasks are being
* generated before they can be run.
* Capacities are reported rather than current numbers of tasks
* in queues because they are better indicators of the existence
* of these kinds of possibly-transient problems.
* Queue capacities are
* resized on demand from their initial value of 4096 elements,
* which is much more than sufficient for the kinds of
* applications that this framework is intended to best support.
* </ul>
**/
public void stats() {
long time = System.currentTimeMillis() - initTime;
double secs = ((double)time) / 1000.0;
long totalRuns = 0;
long totalScans = 0;
long totalSteals = 0;
System.out.print("Thread" +
"\tQ Cap" +
"\tScans" +
"\tNew" +
"\tRuns" +
"\n");
for (int i = 0; i < threads.length; ++i) {
FJTaskRunner t = threads[i];
int truns = t.runs;
totalRuns += truns;
int tscans = t.scans;
totalScans += tscans;
int tsteals = t.steals;
totalSteals += tsteals;
String star = (getActive(t))? "*" : " ";
System.out.print("T" + i + star +
"\t" + t.deqSize() +
"\t" + tscans +
"\t" + tsteals +
"\t" + truns +
"\n");
}
System.out.print("Total" +
"\t " +
"\t" + totalScans +
"\t" + totalSteals +
"\t" + totalRuns +
"\n");
System.out.print("Execute: " + entries);
System.out.print("\tTime: " + secs);
long rps = 0;
if (secs != 0) rps = Math.round((double)(totalRuns) / secs);
System.out.println("\tRate: " + rps);
}
/* ------------ Methods called only by FJTaskRunners ------------- */
/**
* Return the array of threads in this group.
* Called only by FJTaskRunner.scan().
**/
protected FJTaskRunner[] getArray() { return threads; }
/**
* Return a task from entry queue, or null if empty.
* Called only by FJTaskRunner.scan().
**/
protected FJTask pollEntryQueue() {
try {
FJTask t = (FJTask)(entryQueue.poll(0));
return t;
}
catch(InterruptedException ex) { // ignore interrupts
Thread.currentThread().interrupt();
return null;
}
}
/**
* Return active status of t.
* Per-thread active status can only be accessed and
* modified via synchronized method here in the group class.
**/
protected synchronized boolean getActive(FJTaskRunner t) {
return t.active;
}
/**
* Set active status of thread t to true, and notify others
* that might be waiting for work.
**/
protected synchronized void setActive(FJTaskRunner t) {
if (!t.active) {
t.active = true;
++activeCount;
if (nstarted < threads.length)
threads[nstarted++].start();
else
notifyAll();
}
}
/**
* Set active status of thread t to false.
**/
protected synchronized void setInactive(FJTaskRunner t) {
if (t.active) {
t.active = false;
--activeCount;
}
}
/**
* The number of times to scan other threads for tasks
* before transitioning to a mode where scans are
* interleaved with sleeps (actually timed waits).
* Upon transition, sleeps are for duration of
* scans / SCANS_PER_SLEEP milliseconds.
* <p>
* This is not treated as a user-tunable parameter because
* good values do not appear to vary much across JVMs or
* applications. Its main role is to help avoid some
* useless spinning and contention during task startup.
**/
static final long SCANS_PER_SLEEP = 15;
/**
* The maximum time (in msecs) to sleep when a thread is idle,
* yet others are not, so may eventually generate work that
* the current thread can steal. This value reflects the maximum time
* that a thread may sleep when it possibly should not, because there
* are other active threads that might generate work. In practice,
* designs in which some threads become stalled because others
* are running yet not generating tasks are not likely to work
* well in this framework anyway, so the exact value does not matter
* too much. However, keeping it in the sub-second range does
* help smooth out startup and shutdown effects.
**/
static final long MAX_SLEEP_TIME = 100;
/**
* Set active status of thread t to false, and
* then wait until: (a) there is a task in the entry
* queue, or (b) other threads are active, or (c) the current
* thread is interrupted. Upon return, it
* is not certain that there will be work available.
* The thread must itself check.
* <p>
* The main underlying reason
* for these mechanics is that threads do not
* signal each other when they add elements to their queues.
* (This would add to task overhead, reduce locality.
* and increase contention.)
* So we must rely on a tamed form of polling. However, tasks
* inserted into the entry queue do result in signals, so
* tasks can wait on these if all of them are otherwise idle.
**/
protected synchronized void checkActive(FJTaskRunner t, long scans) {
setInactive(t);
try {
// if nothing available, do a hard wait
if (activeCount == 0 && entryQueue.peek() == null) {
wait();
}
else {
// If there is possibly some work,
// sleep for a while before rechecking
long msecs = scans / SCANS_PER_SLEEP;
if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
wait(msecs, nsecs);
}
}
catch (InterruptedException ex) {
notify(); // avoid lost notifies on interrupts
Thread.currentThread().interrupt();
}
}
/* ------------ Utility methods ------------- */
/**
* Start or wake up any threads waiting for work
**/
protected synchronized void signalNewTask() {
if (COLLECT_STATS) ++entries;
if (nstarted < threads.length)
threads[nstarted++].start();
else
notify();
}
/**
* Create all FJTaskRunner threads in this group.
**/
protected void initializeThreads() {
for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
}
/**
* Wrap wait/notify mechanics around a task so that
* invoke() can wait it out
**/
protected static final class InvokableFJTask extends FJTask {
protected final Runnable wrapped;
protected boolean terminated = false;
protected InvokableFJTask(Runnable r) { wrapped = r; }
public void run() {
try {
if (wrapped instanceof FJTask)
FJTask.invoke((FJTask)(wrapped));
else
wrapped.run();
}
finally {
setTerminated();
}
}
protected synchronized void setTerminated() {
terminated = true;
notifyAll();
}
protected synchronized void awaitTermination() throws InterruptedException {
while (!terminated) wait();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -