⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fjtaskrunnergroup.java

📁 一个很好的微工作流内核
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   *       <li> <em>Run</em> The total number of tasks that have been run.   *       <li> <em>New</em> The number of these tasks that were   *               taken from either the entry queue or from other    *               thread queues; that is, the number of tasks run   *               that were <em>not</em> forked by the thread itself.   *       <li> <em>Scan</em> The number of times other task   *               queues or the entry queue were polled for tasks.   *     </ul>   *   <li> <em>Execute</em> The total number of tasks entered   *        (but not necessarily yet run) via execute or invoke.   *   <li> <em>Time</em> Time in seconds since construction of this   *         FJTaskRunnerGroup.   *   <li> <em>Rate</em> The total number of tasks processed   *          per second across all threads. This   *          may be useful as a simple throughput indicator   *          if all processed tasks take approximately the   *          same time to run.   * </ul>   * <p>   * Cautions: Some statistics are updated and gathered    * without synchronization,   * so may not be accurate. However, reported counts may be considered   * as lower bounds of actual values.    * Some values may be zero if classes are compiled   * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup   * classes can be independently compiled with different values of   * COLLECT_STATS.) Also, the counts are maintained as ints so could   * overflow in exceptionally long-lived applications.   * <p>   * These statistics can be useful when tuning algorithms or diagnosing   * problems. For example:   * <ul>   *  <li> High numbers of scans may mean that there is insufficient   *      parallelism to keep threads busy. However, high scan rates   *      are expected if the number   *      of Executes is also high or there is a lot of global   *      synchronization in the application, and the system is not otherwise   *      busy. Threads may scan   *      for work hundreds of times upon startup, shutdown, and   *      global synch points of task sets.   *  <li> Large imbalances in tasks run across different threads might   *      just reflect contention with unrelated threads on a system   *      (possibly including JVM threads such as GC), but may also   *      indicate some systematic bias in how you generate tasks.   *  <li> Large task queue capacities may mean that too many tasks are being   *     generated before they can be run.    *     Capacities are reported rather than current numbers of tasks   *     in queues because they are better indicators of the existence   *     of these kinds of possibly-transient problems.   *     Queue capacities are   *     resized on demand from their initial value of 4096 elements,   *     which is much more than sufficient for the kinds of    *     applications that this framework is intended to best support.   * </ul>   **/  public void stats() {    long time = System.currentTimeMillis() - initTime;    double secs = ((double)time) / 1000.0;    long totalRuns = 0;    long totalScans = 0;    long totalSteals = 0;    System.out.print("Thread" +                     "\tQ Cap" +                       "\tScans" +                       "\tNew" +                       "\tRuns" +                       "\n");    for (int i = 0; i < threads.length; ++i) {      FJTaskRunner t = threads[i];      int truns = t.runs;      totalRuns += truns;      int tscans = t.scans;      totalScans += tscans;      int tsteals = t.steals;      totalSteals += tsteals;      String star = (getActive(t))? "*" : " ";      System.out.print("T" + i + star +                       "\t" + t.deqSize() +                       "\t" + tscans +                       "\t" + tsteals +                       "\t" + truns +                       "\n");    }    System.out.print("Total" +                     "\t    " +                     "\t" + totalScans +                     "\t" + totalSteals +                     "\t" + totalRuns +                     "\n");    System.out.print("Execute: " + entries);         System.out.print("\tTime: " + secs);    long rps = 0;    if (secs != 0) rps = Math.round((double)(totalRuns) / secs);    System.out.println("\tRate: " + rps);  }  /* ------------ Methods called only by FJTaskRunners ------------- */  /**   * Return the array of threads in this group.    * Called only by FJTaskRunner.scan().   **/  protected FJTaskRunner[] getArray() { return threads; }  /**   * Return a task from entry queue, or null if empty.   * Called only by FJTaskRunner.scan().   **/  protected FJTask pollEntryQueue() {    try {      FJTask t = (FJTask)(entryQueue.poll(0));      return t;    }    catch(InterruptedException ex) { // ignore interrupts      Thread.currentThread().interrupt();      return null;    }  }  /**   * Return active status of t.   * Per-thread active status can only be accessed and   * modified via synchronized method here in the group class.   **/  protected synchronized boolean getActive(FJTaskRunner t) {    return t.active;  }  /**   * Set active status of thread t to true, and notify others   * that might be waiting for work.    **/  protected synchronized void setActive(FJTaskRunner t) {    if (!t.active) {       t.active = true;      ++activeCount;      if (nstarted < threads.length)         threads[nstarted++].start();      else        notifyAll();    }  }  /**   * Set active status of thread t to false.   **/  protected synchronized void setInactive(FJTaskRunner t) {    if (t.active) {       t.active = false;      --activeCount;    }  }  /**   * The number of times to scan other threads for tasks    * before transitioning to a mode where scans are   * interleaved with sleeps (actually timed waits).   * Upon transition, sleeps are for duration of   * scans / SCANS_PER_SLEEP milliseconds.   * <p>   * This is not treated as a user-tunable parameter because   * good values do not appear to vary much across JVMs or   * applications. Its main role is to help avoid some   * useless spinning and contention during task startup.   **/  static final long SCANS_PER_SLEEP = 15;  /**   * The maximum time (in msecs) to sleep when a thread is idle,   * yet others are not, so may eventually generate work that   * the current thread can steal. This value reflects the maximum time   * that a thread may sleep when it possibly should not, because there   * are other active threads that might generate work. In practice,   * designs in which some threads become stalled because others   * are running yet not generating tasks are not likely to work   * well in this framework anyway, so the exact value does not matter   * too much. However, keeping it in the sub-second range does   * help smooth out startup and shutdown effects.   **/  static final long MAX_SLEEP_TIME = 100;  /**   * Set active status of thread t to false, and   * then wait until: (a) there is a task in the entry    * queue, or (b) other threads are active, or (c) the current   * thread is interrupted. Upon return, it   * is not certain that there will be work available.   * The thread must itself check.    * <p>   * The main underlying reason   * for these mechanics is that threads do not   * signal each other when they add elements to their queues.   * (This would add to task overhead, reduce locality.   * and increase contention.)   * So we must rely on a tamed form of polling. However, tasks   * inserted into the entry queue do result in signals, so   * tasks can wait on these if all of them are otherwise idle.   **/  protected synchronized void checkActive(FJTaskRunner t, long scans) {    setInactive(t);    try {      // if nothing available, do a hard wait      if (activeCount == 0 && entryQueue.peek() == null) {         wait();      }      else {         // If there is possibly some work,        // sleep for a while before rechecking         long msecs = scans / SCANS_PER_SLEEP;        if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;        int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep        wait(msecs, nsecs);      }    }    catch (InterruptedException ex) {      notify(); // avoid lost notifies on interrupts      Thread.currentThread().interrupt();    }  }  /* ------------ Utility methods  ------------- */  /**   * Start or wake up any threads waiting for work   **/  protected synchronized void signalNewTask() {    if (COLLECT_STATS) ++entries;    if (nstarted < threads.length)        threads[nstarted++].start();    else      notify();  }  /**   * Create all FJTaskRunner threads in this group.   **/  protected void initializeThreads() {    for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);  }  /**   * Wrap wait/notify mechanics around a task so that   * invoke() can wait it out    **/  protected static final class InvokableFJTask extends FJTask {    protected final Runnable wrapped;    protected boolean terminated = false;    protected InvokableFJTask(Runnable r) { wrapped = r; }    public void run() {      try {        if (wrapped instanceof FJTask)          FJTask.invoke((FJTask)(wrapped));        else          wrapped.run();      }      finally {        setTerminated();      }    }    protected synchronized void setTerminated() {      terminated = true;      notifyAll();     }    protected synchronized void awaitTermination() throws InterruptedException {      while (!terminated) wait();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -