⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fjtaskrunner.java

📁 采用JAVA开发
💻 JAVA
📖 第 1 页 / 共 2 页
字号:

  /**
   * Check under synch lock if DEQ is really empty when doing pop. 
   * Return task if not empty, else null.
   **/

  protected final synchronized FJTask confirmPop(int provisionalTop) {
    if (base <= provisionalTop) 
      return deq[provisionalTop & (deq.length-1)].take();
    else {    // was empty
      /*
        Reset DEQ indices to zero whenever it is empty.
        This both avoids unnecessary calls to checkOverflow
        in push, and helps keep the DEQ from accumulating garbage
      */

      top = base = 0;
      return null;
    }
  }


  /** 
   * Take a task from the base of the DEQ.
   * Always called by other threads via scan()
   **/

  
  protected final synchronized FJTask take() {

    /*
      Increment base in order to suppress a contending pop
    */
    
    int b = base++;     
    
    if (b < top) 
      return confirmTake(b);
    else {
      // back out
      base = b; 
      return null;
    }
  }


  /**
   * double-check a potential take
   **/
  
  protected FJTask confirmTake(int oldBase) {

    /*
      Use a second (guaranteed uncontended) synch
      to serve as a barrier in case JVM does not
      properly process read-after-write of 2 volatiles
    */

    synchronized(barrier) {
      if (oldBase < top) {
        /*
          We cannot call deq[oldBase].take here because of possible races when
          nulling out versus concurrent push operations.  Resulting
          accumulated garbage is swept out periodically in
          checkOverflow, or more typically, just by keeping indices
          zero-based when found to be empty in pop, which keeps active
          region small and constantly overwritten. 
        */
        
        return deq[oldBase & (deq.length-1)].get();
      }
      else {
        base = oldBase;
        return null;
      }
    }
  }


  /**
   * Adjust top and base, and grow DEQ if necessary.
   * Called only while DEQ synch lock being held.
   * We don't expect this to be called very often. In most
   * programs using FJTasks, it is never called.
   **/

  protected void checkOverflow() { 
    int t = top;
    int b = base;
    
    if (t - b < deq.length-1) { // check if just need an index reset
      
      int newBase = b & (deq.length-1);
      int newTop  = top & (deq.length-1);
      if (newTop < newBase) newTop += deq.length;
      top = newTop;
      base = newBase;
      
      /* 
         Null out refs to stolen tasks. 
         This is the only time we can safely do it.
      */
      
      int i = newBase;
      while (i != newTop && deq[i].ref != null) {
        deq[i].ref = null;
        i = (i - 1) & (deq.length-1);
      }
      
    }
    else { // grow by doubling array
      
      int newTop = t - b;
      int oldcap = deq.length;
      int newcap = oldcap * 2;
      
      if (newcap >= MAX_CAPACITY)
        throw new Error("FJTask queue maximum capacity exceeded");
      
      VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
      
      // copy in bottom half of new deq with refs from old deq
      for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
      
      // fill top half of new deq with new refs
      for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
      
      deq = newdeq;
      base = 0;
      top = newTop;
    }
  }


  /* ------------ Scheduling  ------------------- */


  /**
   * Do all but the pop() part of yield or join, by
   * traversing all DEQs in our group looking for a task to
   * steal. If none, it checks the entry queue. 
   * <p>
   * Since there are no good, portable alternatives,
   * we rely here on a mixture of Thread.yield and priorities
   * to reduce wasted spinning, even though these are
   * not well defined. We are hoping here that the JVM
   * does something sensible.
   * @param waitingFor if non-null, the current task being joined
   **/

  protected void scan(final FJTask waitingFor) {

    FJTask task = null;

    // to delay lowering priority until first failure to steal
    boolean lowered = false;
    
    /*
      Circularly traverse from a random start index. 
      
      This differs slightly from cilk version that uses a random index
      for each attempted steal.
      Exhaustive scanning might impede analytic tractablity of 
      the scheduling policy, but makes it much easier to deal with
      startup and shutdown.
    */
    
    FJTaskRunner[] ts = group.getArray();
    int idx = victimRNG.nextInt(ts.length);
    
    for (int i = 0; i < ts.length; ++i) {
      
      FJTaskRunner t = ts[idx];
      if (++idx >= ts.length) idx = 0; // circularly traverse
      
      if (t != null && t != this) {
        
        if (waitingFor != null && waitingFor.isDone()) {
          break;
        }
        else {
          if (COLLECT_STATS) ++scans;
          task = t.take();
          if (task != null) {
            if (COLLECT_STATS) ++steals;
            break;
          }
          else if (isInterrupted()) {
            break;
          }
          else if (!lowered) { // if this is first fail, lower priority
            lowered = true;
            setPriority(scanPriority);
          }
          else {           // otherwise we are at low priority; just yield
            yield();
          }
        }
      }
      
    } 

    if (task == null) {
      if (COLLECT_STATS) ++scans;
      task = group.pollEntryQueue();
      if (COLLECT_STATS) if (task != null) ++steals;
    }
    
    if (lowered) setPriority(runPriority);
    
    if (task != null && !task.isDone()) {
      if (COLLECT_STATS) ++runs;
      task.run(); 
      task.setDone(); 
    }

  }

  /**
   * Same as scan, but called when current thread is idling.
   * It repeatedly scans other threads for tasks,
   * sleeping while none are available. 
   * <p>
   * This differs from scan mainly in that
   * since there is no reason to return to recheck any
   * condition, we iterate until a task is found, backing
   * off via sleeps if necessary.
   **/

  protected void scanWhileIdling() {
    FJTask task = null;
    
    boolean lowered = false;
    long iters = 0;
    
    FJTaskRunner[] ts = group.getArray();
    int idx = victimRNG.nextInt(ts.length);
    
    do {
      for (int i = 0; i < ts.length; ++i) {
        
        FJTaskRunner t = ts[idx];
        if (++idx >= ts.length) idx = 0; // circularly traverse
        
        if (t != null && t != this) {
          if (COLLECT_STATS) ++scans;
          
          task = t.take();
          if (task != null) {
            if (COLLECT_STATS) ++steals;
            if (lowered) setPriority(runPriority);
            group.setActive(this);
            break;
          }
        }
      } 
      
      if (task == null) {
        if (isInterrupted()) 
          return;
        
        if (COLLECT_STATS) ++scans;
        task = group.pollEntryQueue();
        
        if (task != null) {
          if (COLLECT_STATS) ++steals;
          if (lowered) setPriority(runPriority);
          group.setActive(this);
        }
        else {
          ++iters;
          //  Check here for yield vs sleep to avoid entering group synch lock
          if (iters >= group.SCANS_PER_SLEEP) {
            group.checkActive(this, iters);
            if (isInterrupted())
              return;
          }
          else if (!lowered) {
            lowered = true;
            setPriority(scanPriority);
          }
          else {
            yield();
          }
        }
      }
    } while (task == null);


    if (!task.isDone()) {
      if (COLLECT_STATS) ++runs;
      task.run(); 
      task.setDone(); 
    }
    
  }

  /* ------------  composite operations ------------------- */

    
  /**
   * Main runloop
   **/

  public void run() {
    try{ 
      while (!interrupted()) {
        
        FJTask task = pop();
        if (task != null) {
          if (!task.isDone()) {
            // inline FJTask.invoke
            if (COLLECT_STATS) ++runs;
            task.run(); 
            task.setDone(); 
          }
        }
        else
          scanWhileIdling();
      }
    }
    finally {
      group.setInactive(this);
    }
  }

  /**
   * Execute a task in this thread. Generally called when current task
   * cannot otherwise continue.
   **/

    
  protected final void taskYield() {
    FJTask task = pop();
    if (task != null) {
      if (!task.isDone()) {
        if (COLLECT_STATS) ++runs;
        task.run(); 
        task.setDone(); 
      }
    }
    else
      scan(null);
  }


  /**
   * Process tasks until w is done.
   * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
   **/

  protected final void taskJoin(final FJTask w) {

    while (!w.isDone()) { 

      FJTask task = pop();
      if (task != null) {
        if (!task.isDone()) {
          if (COLLECT_STATS) ++runs;
          task.run(); 
          task.setDone(); 
          if (task == w) return; // fast exit if we just ran w
        }
      }
      else
        scan(w);
    }
  }

  /**
   * A specialized expansion of
   * <code> w.fork(); invoke(v); w.join(); </code>
   **/


  protected final void coInvoke(final FJTask w, final FJTask v) {

    // inline  push

    int t = top;
    if (t < (base & (deq.length-1)) + deq.length) {

      deq[t & (deq.length-1)].put(w);
      top = t + 1;

      // inline  invoke

      if (!v.isDone()) { 
        if (COLLECT_STATS) ++runs; 
        v.run(); 
        v.setDone(); 
      }
      
      // inline  taskJoin
      
      while (!w.isDone()) {
        FJTask task  = pop();
        if (task != null) {
          if (!task.isDone()) {
            if (COLLECT_STATS) ++runs;
            task.run(); 
            task.setDone(); 
            if (task == w) return; // fast exit if we just ran w
          }
        }
        else
          scan(w);
      }
    }

    else      // handle non-inlinable cases
      slowCoInvoke(w, v);
  }


  /**
   * Backup to handle noninlinable cases of coInvoke
   **/

  protected void slowCoInvoke(final FJTask w, final FJTask v) {
    push(w); // let push deal with overflow
    FJTask.invoke(v);
    taskJoin(w);
  }


  /**
   * Array-based version of coInvoke
   **/

  protected final void coInvoke(FJTask[] tasks) {
    int nforks = tasks.length - 1;

    // inline bulk push of all but one task

    int t = top;

    if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
      for (int i = 0; i < nforks; ++i) {
        deq[t++ & (deq.length-1)].put(tasks[i]);
        top = t;
      }

      // inline invoke of one task
      FJTask v = tasks[nforks];
      if (!v.isDone()) { 
        if (COLLECT_STATS) ++runs; 
        v.run(); 
        v.setDone(); 
      }
      
      // inline  taskJoins
      
      for (int i = 0; i < nforks; ++i) { 
        FJTask w = tasks[i];
        while (!w.isDone()) {

          FJTask task = pop();
          if (task != null) {
            if (!task.isDone()) {
              if (COLLECT_STATS) ++runs;
              task.run(); 
              task.setDone(); 
            }
          }
          else
            scan(w);
        }
      }
    }

    else  // handle non-inlinable cases
      slowCoInvoke(tasks);
  }

  /**
   * Backup to handle atypical or noninlinable cases of coInvoke
   **/

  protected void slowCoInvoke(FJTask[] tasks) {
    for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
    for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -