⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fjtaskrunnergroup.java

📁 采用JAVA开发
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   *       <li> <em>Run</em> The total number of tasks that have been run.
   *       <li> <em>New</em> The number of these tasks that were
   *               taken from either the entry queue or from other 
   *               thread queues; that is, the number of tasks run
   *               that were <em>not</em> forked by the thread itself.
   *       <li> <em>Scan</em> The number of times other task
   *               queues or the entry queue were polled for tasks.
   *     </ul>
   *   <li> <em>Execute</em> The total number of tasks entered
   *        (but not necessarily yet run) via execute or invoke.
   *   <li> <em>Time</em> Time in seconds since construction of this
   *         FJTaskRunnerGroup.
   *   <li> <em>Rate</em> The total number of tasks processed
   *          per second across all threads. This
   *          may be useful as a simple throughput indicator
   *          if all processed tasks take approximately the
   *          same time to run.
   * </ul>
   * <p>
   * Cautions: Some statistics are updated and gathered 
   * without synchronization,
   * so may not be accurate. However, reported counts may be considered
   * as lower bounds of actual values. 
   * Some values may be zero if classes are compiled
   * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
   * classes can be independently compiled with different values of
   * COLLECT_STATS.) Also, the counts are maintained as ints so could
   * overflow in exceptionally long-lived applications.
   * <p>
   * These statistics can be useful when tuning algorithms or diagnosing
   * problems. For example:
   * <ul>
   *  <li> High numbers of scans may mean that there is insufficient
   *      parallelism to keep threads busy. However, high scan rates
   *      are expected if the number
   *      of Executes is also high or there is a lot of global
   *      synchronization in the application, and the system is not otherwise
   *      busy. Threads may scan
   *      for work hundreds of times upon startup, shutdown, and
   *      global synch points of task sets.
   *  <li> Large imbalances in tasks run across different threads might
   *      just reflect contention with unrelated threads on a system
   *      (possibly including JVM threads such as GC), but may also
   *      indicate some systematic bias in how you generate tasks.
   *  <li> Large task queue capacities may mean that too many tasks are being
   *     generated before they can be run. 
   *     Capacities are reported rather than current numbers of tasks
   *     in queues because they are better indicators of the existence
   *     of these kinds of possibly-transient problems.
   *     Queue capacities are
   *     resized on demand from their initial value of 4096 elements,
   *     which is much more than sufficient for the kinds of 
   *     applications that this framework is intended to best support.
   * </ul>
   **/

  public void stats() {
    long time = System.currentTimeMillis() - initTime;
    double secs = ((double)time) / 1000.0;
    long totalRuns = 0;
    long totalScans = 0;
    long totalSteals = 0;

    System.out.print("Thread" +
                     "\tQ Cap" +
                       "\tScans" +
                       "\tNew" +
                       "\tRuns" +
                       "\n");

    for (int i = 0; i < threads.length; ++i) {
      FJTaskRunner t = threads[i];
      int truns = t.runs;
      totalRuns += truns;

      int tscans = t.scans;
      totalScans += tscans;

      int tsteals = t.steals;
      totalSteals += tsteals;

      String star = (getActive(t))? "*" : " ";


      System.out.print("T" + i + star +
                       "\t" + t.deqSize() +
                       "\t" + tscans +
                       "\t" + tsteals +
                       "\t" + truns +
                       "\n");
    }

    System.out.print("Total" +
                     "\t    " +
                     "\t" + totalScans +
                     "\t" + totalSteals +
                     "\t" + totalRuns +
                     "\n");

    System.out.print("Execute: " + entries); 
    
    System.out.print("\tTime: " + secs);

    long rps = 0;
    if (secs != 0) rps = Math.round((double)(totalRuns) / secs);

    System.out.println("\tRate: " + rps);
  }


  /* ------------ Methods called only by FJTaskRunners ------------- */


  /**
   * Return the array of threads in this group. 
   * Called only by FJTaskRunner.scan().
   **/

  protected FJTaskRunner[] getArray() { return threads; }


  /**
   * Return a task from entry queue, or null if empty.
   * Called only by FJTaskRunner.scan().
   **/

  protected FJTask pollEntryQueue() {
    try {
      FJTask t = (FJTask)(entryQueue.poll(0));
      return t;
    }
    catch(InterruptedException ex) { // ignore interrupts
      Thread.currentThread().interrupt();
      return null;
    }
  }


  /**
   * Return active status of t.
   * Per-thread active status can only be accessed and
   * modified via synchronized method here in the group class.
   **/

  protected synchronized boolean getActive(FJTaskRunner t) {
    return t.active;
  }


  /**
   * Set active status of thread t to true, and notify others
   * that might be waiting for work. 
   **/

  protected synchronized void setActive(FJTaskRunner t) {
    if (!t.active) { 
      t.active = true;
      ++activeCount;
      if (nstarted < threads.length) 
        threads[nstarted++].start();
      else
        notifyAll();
    }
  }

  /**
   * Set active status of thread t to false.
   **/

  protected synchronized void setInactive(FJTaskRunner t) {
    if (t.active) { 
      t.active = false;
      --activeCount;
    }
  }

  /**
   * The number of times to scan other threads for tasks 
   * before transitioning to a mode where scans are
   * interleaved with sleeps (actually timed waits).
   * Upon transition, sleeps are for duration of
   * scans / SCANS_PER_SLEEP milliseconds.
   * <p>
   * This is not treated as a user-tunable parameter because
   * good values do not appear to vary much across JVMs or
   * applications. Its main role is to help avoid some
   * useless spinning and contention during task startup.
   **/
  static final long SCANS_PER_SLEEP = 15;

  /**
   * The maximum time (in msecs) to sleep when a thread is idle,
   * yet others are not, so may eventually generate work that
   * the current thread can steal. This value reflects the maximum time
   * that a thread may sleep when it possibly should not, because there
   * are other active threads that might generate work. In practice,
   * designs in which some threads become stalled because others
   * are running yet not generating tasks are not likely to work
   * well in this framework anyway, so the exact value does not matter
   * too much. However, keeping it in the sub-second range does
   * help smooth out startup and shutdown effects.
   **/

  static final long MAX_SLEEP_TIME = 100;

  /**
   * Set active status of thread t to false, and
   * then wait until: (a) there is a task in the entry 
   * queue, or (b) other threads are active, or (c) the current
   * thread is interrupted. Upon return, it
   * is not certain that there will be work available.
   * The thread must itself check. 
   * <p>
   * The main underlying reason
   * for these mechanics is that threads do not
   * signal each other when they add elements to their queues.
   * (This would add to task overhead, reduce locality.
   * and increase contention.)
   * So we must rely on a tamed form of polling. However, tasks
   * inserted into the entry queue do result in signals, so
   * tasks can wait on these if all of them are otherwise idle.
   **/

  protected synchronized void checkActive(FJTaskRunner t, long scans) {

    setInactive(t);

    try {
      // if nothing available, do a hard wait
      if (activeCount == 0 && entryQueue.peek() == null) { 
        wait();
      }
      else { 
        // If there is possibly some work,
        // sleep for a while before rechecking 

        long msecs = scans / SCANS_PER_SLEEP;
        if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
        int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
        wait(msecs, nsecs);
      }
    }
    catch (InterruptedException ex) {
      notify(); // avoid lost notifies on interrupts
      Thread.currentThread().interrupt();
    }
  }

  /* ------------ Utility methods  ------------- */

  /**
   * Start or wake up any threads waiting for work
   **/

  protected synchronized void signalNewTask() {
    if (COLLECT_STATS) ++entries;
    if (nstarted < threads.length) 
       threads[nstarted++].start();
    else
      notify();
  }

  /**
   * Create all FJTaskRunner threads in this group.
   **/

  protected void initializeThreads() {
    for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
  }




  /**
   * Wrap wait/notify mechanics around a task so that
   * invoke() can wait it out 
   **/
  protected static final class InvokableFJTask extends FJTask {
    protected final Runnable wrapped;
    protected boolean terminated = false;

    protected InvokableFJTask(Runnable r) { wrapped = r; }

    public void run() {
      try {
        if (wrapped instanceof FJTask)
          FJTask.invoke((FJTask)(wrapped));
        else
          wrapped.run();
      }
      finally {
        setTerminated();
      }
    }

    protected synchronized void setTerminated() {
      terminated = true;
      notifyAll(); 
    }

    protected synchronized void awaitTermination() throws InterruptedException {
      while (!terminated) wait();
    }
  }


}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -