⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fjtaskrunner.java

📁 一个很好的微工作流内核
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  /**   * Check under synch lock if DEQ is really empty when doing pop.    * Return task if not empty, else null.   **/  protected final synchronized FJTask confirmPop(int provisionalTop) {    if (base <= provisionalTop)       return deq[provisionalTop & (deq.length-1)].take();    else {    // was empty      /*        Reset DEQ indices to zero whenever it is empty.        This both avoids unnecessary calls to checkOverflow        in push, and helps keep the DEQ from accumulating garbage      */      top = base = 0;      return null;    }  }  /**    * Take a task from the base of the DEQ.   * Always called by other threads via scan()   **/    protected final synchronized FJTask take() {    /*      Increment base in order to suppress a contending pop    */        int b = base++;             if (b < top)       return confirmTake(b);    else {      // back out      base = b;       return null;    }  }  /**   * double-check a potential take   **/    protected FJTask confirmTake(int oldBase) {    /*      Use a second (guaranteed uncontended) synch      to serve as a barrier in case JVM does not      properly process read-after-write of 2 volatiles    */    synchronized(barrier) {      if (oldBase < top) {        /*          We cannot call deq[oldBase].take here because of possible races when          nulling out versus concurrent push operations.  Resulting          accumulated garbage is swept out periodically in          checkOverflow, or more typically, just by keeping indices          zero-based when found to be empty in pop, which keeps active          region small and constantly overwritten.         */                return deq[oldBase & (deq.length-1)].get();      }      else {        base = oldBase;        return null;      }    }  }  /**   * Adjust top and base, and grow DEQ if necessary.   * Called only while DEQ synch lock being held.   * We don't expect this to be called very often. In most   * programs using FJTasks, it is never called.   **/  protected void checkOverflow() {     int t = top;    int b = base;        if (t - b < deq.length-1) { // check if just need an index reset            int newBase = b & (deq.length-1);      int newTop  = top & (deq.length-1);      if (newTop < newBase) newTop += deq.length;      top = newTop;      base = newBase;            /*          Null out refs to stolen tasks.          This is the only time we can safely do it.      */            int i = newBase;      while (i != newTop && deq[i].ref != null) {        deq[i].ref = null;        i = (i - 1) & (deq.length-1);      }          }    else { // grow by doubling array            int newTop = t - b;      int oldcap = deq.length;      int newcap = oldcap * 2;            if (newcap >= MAX_CAPACITY)        throw new Error("FJTask queue maximum capacity exceeded");            VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];            // copy in bottom half of new deq with refs from old deq      for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];            // fill top half of new deq with new refs      for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();            deq = newdeq;      base = 0;      top = newTop;    }  }  /* ------------ Scheduling  ------------------- */  /**   * Do all but the pop() part of yield or join, by   * traversing all DEQs in our group looking for a task to   * steal. If none, it checks the entry queue.    * <p>   * Since there are no good, portable alternatives,   * we rely here on a mixture of Thread.yield and priorities   * to reduce wasted spinning, even though these are   * not well defined. We are hoping here that the JVM   * does something sensible.   * @param waitingFor if non-null, the current task being joined   **/  protected void scan(final FJTask waitingFor) {    FJTask task = null;    // to delay lowering priority until first failure to steal    boolean lowered = false;        /*      Circularly traverse from a random start index.             This differs slightly from cilk version that uses a random index      for each attempted steal.      Exhaustive scanning might impede analytic tractablity of       the scheduling policy, but makes it much easier to deal with      startup and shutdown.    */        FJTaskRunner[] ts = group.getArray();    int idx = victimRNG.nextInt(ts.length);        for (int i = 0; i < ts.length; ++i) {            FJTaskRunner t = ts[idx];      if (++idx >= ts.length) idx = 0; // circularly traverse            if (t != null && t != this) {                if (waitingFor != null && waitingFor.isDone()) {          break;        }        else {          if (COLLECT_STATS) ++scans;          task = t.take();          if (task != null) {            if (COLLECT_STATS) ++steals;            break;          }          else if (isInterrupted()) {            break;          }          else if (!lowered) { // if this is first fail, lower priority            lowered = true;            setPriority(scanPriority);          }          else {           // otherwise we are at low priority; just yield            yield();          }        }      }          }     if (task == null) {      if (COLLECT_STATS) ++scans;      task = group.pollEntryQueue();      if (COLLECT_STATS) if (task != null) ++steals;    }        if (lowered) setPriority(runPriority);        if (task != null && !task.isDone()) {      if (COLLECT_STATS) ++runs;      task.run();       task.setDone();     }  }  /**   * Same as scan, but called when current thread is idling.   * It repeatedly scans other threads for tasks,   * sleeping while none are available.    * <p>   * This differs from scan mainly in that   * since there is no reason to return to recheck any   * condition, we iterate until a task is found, backing   * off via sleeps if necessary.   **/  protected void scanWhileIdling() {    FJTask task = null;        boolean lowered = false;    long iters = 0;        FJTaskRunner[] ts = group.getArray();    int idx = victimRNG.nextInt(ts.length);        do {      for (int i = 0; i < ts.length; ++i) {                FJTaskRunner t = ts[idx];        if (++idx >= ts.length) idx = 0; // circularly traverse                if (t != null && t != this) {          if (COLLECT_STATS) ++scans;                    task = t.take();          if (task != null) {            if (COLLECT_STATS) ++steals;            if (lowered) setPriority(runPriority);            group.setActive(this);            break;          }        }      }             if (task == null) {        if (isInterrupted())           return;                if (COLLECT_STATS) ++scans;        task = group.pollEntryQueue();                if (task != null) {          if (COLLECT_STATS) ++steals;          if (lowered) setPriority(runPriority);          group.setActive(this);        }        else {          ++iters;          //  Check here for yield vs sleep to avoid entering group synch lock          if (iters >= group.SCANS_PER_SLEEP) {            group.checkActive(this, iters);            if (isInterrupted())              return;          }          else if (!lowered) {            lowered = true;            setPriority(scanPriority);          }          else {            yield();          }        }      }    } while (task == null);    if (!task.isDone()) {      if (COLLECT_STATS) ++runs;      task.run();       task.setDone();     }      }  /* ------------  composite operations ------------------- */      /**   * Main runloop   **/  public void run() {    try{       while (!interrupted()) {                FJTask task = pop();        if (task != null) {          if (!task.isDone()) {            // inline FJTask.invoke            if (COLLECT_STATS) ++runs;            task.run();             task.setDone();           }        }        else          scanWhileIdling();      }    }    finally {      group.setInactive(this);    }  }  /**   * Execute a task in this thread. Generally called when current task   * cannot otherwise continue.   **/      protected final void taskYield() {    FJTask task = pop();    if (task != null) {      if (!task.isDone()) {        if (COLLECT_STATS) ++runs;        task.run();         task.setDone();       }    }    else      scan(null);  }  /**   * Process tasks until w is done.   * Equivalent to <code>while(!w.isDone()) taskYield(); </code>   **/  protected final void taskJoin(final FJTask w) {    while (!w.isDone()) {       FJTask task = pop();      if (task != null) {        if (!task.isDone()) {          if (COLLECT_STATS) ++runs;          task.run();           task.setDone();           if (task == w) return; // fast exit if we just ran w        }      }      else        scan(w);    }  }  /**   * A specialized expansion of   * <code> w.fork(); invoke(v); w.join(); </code>   **/  protected final void coInvoke(final FJTask w, final FJTask v) {    // inline  push    int t = top;    if (t < (base & (deq.length-1)) + deq.length) {      deq[t & (deq.length-1)].put(w);      top = t + 1;      // inline  invoke      if (!v.isDone()) {         if (COLLECT_STATS) ++runs;         v.run();         v.setDone();       }            // inline  taskJoin            while (!w.isDone()) {        FJTask task  = pop();        if (task != null) {          if (!task.isDone()) {            if (COLLECT_STATS) ++runs;            task.run();             task.setDone();             if (task == w) return; // fast exit if we just ran w          }        }        else          scan(w);      }    }    else      // handle non-inlinable cases      slowCoInvoke(w, v);  }  /**   * Backup to handle noninlinable cases of coInvoke   **/  protected void slowCoInvoke(final FJTask w, final FJTask v) {    push(w); // let push deal with overflow    FJTask.invoke(v);    taskJoin(w);  }  /**   * Array-based version of coInvoke   **/  protected final void coInvoke(FJTask[] tasks) {    int nforks = tasks.length - 1;    // inline bulk push of all but one task    int t = top;    if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {      for (int i = 0; i < nforks; ++i) {        deq[t++ & (deq.length-1)].put(tasks[i]);        top = t;      }      // inline invoke of one task      FJTask v = tasks[nforks];      if (!v.isDone()) {         if (COLLECT_STATS) ++runs;         v.run();         v.setDone();       }            // inline  taskJoins            for (int i = 0; i < nforks; ++i) {         FJTask w = tasks[i];        while (!w.isDone()) {          FJTask task = pop();          if (task != null) {            if (!task.isDone()) {              if (COLLECT_STATS) ++runs;              task.run();               task.setDone();             }          }          else            scan(w);        }      }    }    else  // handle non-inlinable cases      slowCoInvoke(tasks);  }  /**   * Backup to handle atypical or noninlinable cases of coInvoke   **/  protected void slowCoInvoke(FJTask[] tasks) {    for (int i = 0; i < tasks.length; ++i) push(tasks[i]);    for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -