semaphore.java

来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 809 行 · 第 1/3 页

JAVA
809
字号
/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package edu.emory.mathcs.backport.java.util.concurrent;

import java.util.*;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.WaitQueue.*;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.*;

/**
 * A counting semaphore.  Conceptually, a semaphore maintains a set of
 * permits.  Each {@link #acquire} blocks if necessary until a permit is
 * available, and then takes it.  Each {@link #release} adds a permit,
 * potentially releasing a blocking acquirer.
 * However, no actual permit objects are used; the <tt>Semaphore</tt> just
 * keeps a count of the number available and acts accordingly.
 *
 * <p>Semaphores are often used to restrict the number of threads than can
 * access some (physical or logical) resource. For example, here is
 * a class that uses a semaphore to control access to a pool of items:
 * <pre>
 * class Pool {
 *   private static final MAX_AVAILABLE = 100;
 *   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
 *
 *   public Object getItem() throws InterruptedException {
 *     available.acquire();
 *     return getNextAvailableItem();
 *   }
 *
 *   public void putItem(Object x) {
 *     if (markAsUnused(x))
 *       available.release();
 *   }
 *
 *   // Not a particularly efficient data structure; just for demo
 *
 *   protected Object[] items = ... whatever kinds of items being managed
 *   protected boolean[] used = new boolean[MAX_AVAILABLE];
 *
 *   protected synchronized Object getNextAvailableItem() {
 *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
 *       if (!used[i]) {
 *          used[i] = true;
 *          return items[i];
 *       }
 *     }
 *     return null; // not reached
 *   }
 *
 *   protected synchronized boolean markAsUnused(Object item) {
 *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
 *       if (item == items[i]) {
 *          if (used[i]) {
 *            used[i] = false;
 *            return true;
 *          } else
 *            return false;
 *       }
 *     }
 *     return false;
 *   }
 *
 * }
 * </pre>
 *
 * <p>Before obtaining an item each thread must acquire a permit from
 * the semaphore, guaranteeing that an item is available for use. When
 * the thread has finished with the item it is returned back to the
 * pool and a permit is returned to the semaphore, allowing another
 * thread to acquire that item.  Note that no synchronization lock is
 * held when {@link #acquire} is called as that would prevent an item
 * from being returned to the pool.  The semaphore encapsulates the
 * synchronization needed to restrict access to the pool, separately
 * from any synchronization needed to maintain the consistency of the
 * pool itself.
 *
 * <p>A semaphore initialized to one, and which is used such that it
 * only has at most one permit available, can serve as a mutual
 * exclusion lock.  This is more commonly known as a <em>binary
 * semaphore</em>, because it only has two states: one permit
 * available, or zero permits available.  When used in this way, the
 * binary semaphore has the property (unlike many {@link Lock}
 * implementations), that the &quot;lock&quot; can be released by a
 * thread other than the owner (as semaphores have no notion of
 * ownership).  This can be useful in some specialized contexts, such
 * as deadlock recovery.
 *
 * <p> The constructor for this class optionally accepts a
 * <em>fairness</em> parameter. When set false, this class makes no
 * guarantees about the order in which threads acquire permits. In
 * particular, <em>barging</em> is permitted, that is, a thread
 * invoking {@link #acquire} can be allocated a permit ahead of a
 * thread that has been waiting - logically the new thread places itself at
 * the head of the queue of waiting threads. When fairness is set true, the
 * semaphore guarantees that threads invoking any of the {@link
 * #acquire() acquire} methods are selected to obtain permits in the order in
 * which their invocation of those methods was processed
 * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
 * applies to specific internal points of execution within these
 * methods.  So, it is possible for one thread to invoke
 * <tt>acquire</tt> before another, but reach the ordering point after
 * the other, and similarly upon return from the method.
 * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not
 * honor the fairness setting, but will take any permits that are
 * available.
 *
 * <p>Generally, semaphores used to control resource access should be
 * initialized as fair, to ensure that no thread is starved out from
 * accessing a resource. When using semaphores for other kinds of
 * synchronization control, the throughput advantages of non-fair
 * ordering often outweigh fairness considerations.
 *
 * <!--
 * <p>This class also provides convenience methods to {@link
 * #acquire(int) acquire} and {@link #release(int) release} multiple
 * permits at a time.  Beware of the increased risk of indefinite
 * postponement when these methods are used without fairness set true.-->
 *
 * @since 1.5
 * @author Doug Lea
 */

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;

    private final Impl impl;

    /**
     * Synchronization implementation for semaphore.
     * Subclassed into fair and nonfair versions.
     */
    static abstract class Impl implements java.io.Serializable {
        /** current number of available permits **/
        int permits_;

        protected Impl(int permits) {
            this.permits_ = permits;
        }

        abstract void acquire() throws InterruptedException;

        public boolean attempt() {
            synchronized (this) {
                if (permits_ > 0) {
                    --permits_;
                    return true;
                }
                else {
                    return false;
                }
            }
        }

        abstract boolean attempt(long nanos) throws InterruptedException;

        abstract void release(int requested);

        public synchronized int getPermits() {
            return permits_;
        }

        public synchronized int drain() {
            int acquired = permits_;
            permits_ = 0;
            return acquired;
        }

        public synchronized void reduce(int reduction) {
            permits_ -= reduction;
        }

        abstract boolean hasQueuedThreads();
        abstract int getQueueLength();
        abstract Collection getQueuedThreads();
    }

    /**
     * Nonfair version
     */
    final static class NonfairImpl extends Impl implements java.io.Serializable {

        protected NonfairImpl(int initialPermits) {
            super(initialPermits);
        }

        public void acquire() throws InterruptedException {
            if (Thread.interrupted()) throw new InterruptedException();
            synchronized (this) {
                try {
                    while (permits_ <= 0) wait();
                    --permits_;
                }
                catch (InterruptedException ex) {
                    notify();
                    throw ex;
                }
            }
        }

        public boolean attempt(long nanos) throws InterruptedException {
            if (Thread.interrupted()) throw new InterruptedException();

            synchronized (this) {
                if (permits_ > 0) {
                    --permits_;
                    return true;
                }
                else if (nanos <= 0)
                    return false;
                else {
                    try {
                        long deadline = Utils.nanoTime() + nanos;
                        for (; ; ) {
                            TimeUnit.NANOSECONDS.timedWait(this, nanos);
                            if (permits_ > 0) {
                                --permits_;
                                return true;
                            }
                            else {
                                nanos = deadline - Utils.nanoTime();
                                if (nanos <= 0)
                                    return false;
                            }
                        }
                    }
                    catch (InterruptedException ex) {
                        notify();
                        throw ex;
                    }
                }
            }
        }

        public synchronized void release(int n) {
            if (n < 0) throw new IllegalArgumentException("Negative argument");

            permits_ += n;
            for (int i = 0; i < n; ++i) notify();
        }

        public boolean hasQueuedThreads() {
            throw new UnsupportedOperationException("Use FAIR version");
        }

        public int getQueueLength() {
            throw new UnsupportedOperationException("Use FAIR version");
        }

        public Collection getQueuedThreads() {
            throw new UnsupportedOperationException("Use FAIR version");
        }
    }

    /**
     * Fair version
     */
    final static class FairImpl extends Impl implements QueuedSync,
                                                        java.io.Serializable {

        WaitQueue wq_ = new FIFOWaitQueue();

        FairImpl(int initialPermits) {
            super(initialPermits);
        }

        public void acquire() throws InterruptedException {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?