⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 orderedthreadpoolexecutor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.filter.executor;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Queue;import java.util.Set;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.core.session.AttributeKey;import org.apache.mina.core.session.DummySession;import org.apache.mina.core.session.IoEvent;import org.apache.mina.core.session.IoSession;/** * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s. * <p> * If you don't need to maintain the order of events per session, please use * {@link UnorderedThreadPoolExecutor}. * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 762167 $, $Date: 2009-04-05 23:57:44 +0200 (Sun, 05 Apr 2009) $ * @org.apache.xbean.XBean */public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {    /** A default value for the initial pool size */    private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;        /** A default value for the maximum pool size */    private static final int DEFAULT_MAX_THREAD_POOL = 16;        /** A default value for the KeepAlive delay */    private static final int DEFAULT_KEEP_ALIVE = 30;        private static final IoSession EXIT_SIGNAL = new DummySession();    /** A key stored into the session's attribute for the event tasks being queued */     private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");        private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();    private final Set<Worker> workers = new HashSet<Worker>();    private volatile int largestPoolSize;    private final AtomicInteger idleWorkers = new AtomicInteger();    private long completedTaskCount;    private volatile boolean shutdown;    private final IoEventQueueHandler eventQueueHandler;    /**     * Creates a default ThreadPool, with default values :     * - minimum pool size is 0     * - maximum pool size is 16     * - keepAlive set to 30 seconds     * - A default ThreadFactory     * - All events are accepted     */    public OrderedThreadPoolExecutor() {        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL,             DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);    }    /**     * Creates a default ThreadPool, with default values :     * - minimum pool size is 0     * - keepAlive set to 30 seconds     * - A default ThreadFactory     * - All events are accepted     *      * @param maximumPoolSize The maximum pool size     */    public OrderedThreadPoolExecutor(int maximumPoolSize) {        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,             Executors.defaultThreadFactory(), null);    }    /**     * Creates a default ThreadPool, with default values :     * - keepAlive set to 30 seconds     * - A default ThreadFactory     * - All events are accepted     *     * @param corePoolSize The initial pool sizePoolSize     * @param maximumPoolSize The maximum pool size     */    public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {        this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,             Executors.defaultThreadFactory(), null);    }    /**     * Creates a default ThreadPool, with default values :     * - A default ThreadFactory     * - All events are accepted     *      * @param corePoolSize The initial pool sizePoolSize     * @param maximumPoolSize The maximum pool size     * @param keepAliveTime Default duration for a thread     * @param unit Time unit used for the keepAlive value     */    public OrderedThreadPoolExecutor(            int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit,             Executors.defaultThreadFactory(), null);    }    /**     * Creates a default ThreadPool, with default values :     * - A default ThreadFactory     *      * @param corePoolSize The initial pool sizePoolSize     * @param maximumPoolSize The maximum pool size     * @param keepAliveTime Default duration for a thread     * @param unit Time unit used for the keepAlive value     * @param queueHandler The queue used to store events     */    public OrderedThreadPoolExecutor(            int corePoolSize, int maximumPoolSize,            long keepAliveTime, TimeUnit unit,            IoEventQueueHandler queueHandler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit,             Executors.defaultThreadFactory(), queueHandler);    }    /**     * Creates a default ThreadPool, with default values :     * - A default ThreadFactory     *      * @param corePoolSize The initial pool sizePoolSize     * @param maximumPoolSize The maximum pool size     * @param keepAliveTime Default duration for a thread     * @param unit Time unit used for the keepAlive value     * @param threadFactory The factory used to create threads     */    public OrderedThreadPoolExecutor(            int corePoolSize, int maximumPoolSize,            long keepAliveTime, TimeUnit unit,            ThreadFactory threadFactory) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);    }    /**     * Creates a new instance of a OrderedThreadPoolExecutor.     *      * @param corePoolSize The initial pool sizePoolSize     * @param maximumPoolSize The maximum pool size     * @param keepAliveTime Default duration for a thread     * @param unit Time unit used for the keepAlive value     * @param threadFactory The factory used to create threads     * @param queueHandler The queue used to store events     */    public OrderedThreadPoolExecutor(            int corePoolSize, int maximumPoolSize,            long keepAliveTime, TimeUnit unit,            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {        // We have to initialize the pool with default values (0 and 1) in order to        // handle the exception in a better way. We can't add a try {} catch() {}        // around the super() call.        super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,             new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());        if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);        }        if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {            throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);        }        // Now, we can setup the pool sizes        super.setCorePoolSize( corePoolSize );        super.setMaximumPoolSize( maximumPoolSize );                // The queueHandler might be null.        this.eventQueueHandler = queueHandler;    }        /**     * @return The associated queue handler.      */    public IoEventQueueHandler getQueueHandler() {        return eventQueueHandler;    }    /**     * {@inheritDoc}     */    @Override    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {        // Ignore the request.  It must always be AbortPolicy.    }    /**     * Add a new thread to execute a task, if needed and possible.     * It depends on the current pool size. If it's full, we do nothing.     */    private void addWorker() {        synchronized (workers) {            if (workers.size() >= super.getMaximumPoolSize()) {                return;            }            // Create a new worker, and add it to the thread pool            Worker worker = new Worker();            Thread thread = getThreadFactory().newThread(worker);                        // As we have added a new thread, it's considered as idle.            idleWorkers.incrementAndGet();                        // Now, we can start it.            thread.start();            workers.add(worker);            if (workers.size() > largestPoolSize) {                largestPoolSize = workers.size();            }        }    }    /**     * Add a new Worker only if there are no idle worker.     */    private void addWorkerIfNecessary() {        if (idleWorkers.get() == 0) {            synchronized (workers) {                if (workers.isEmpty() || (idleWorkers.get() == 0)) {                    addWorker();                }            }        }    }    private void removeWorker() {        synchronized (workers) {            if (workers.size() <= super.getCorePoolSize()) {                return;            }            waitingSessions.offer(EXIT_SIGNAL);        }    }    /**     * {@inheritDoc}     */    @Override    public int getMaximumPoolSize() {        return super.getMaximumPoolSize();    }    /**     * {@inheritDoc}     */    @Override    public void setMaximumPoolSize(int maximumPoolSize) {        if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {            throw new IllegalArgumentException("maximumPoolSize: "                    + maximumPoolSize);        }        synchronized (workers) {            super.setMaximumPoolSize( maximumPoolSize );            int difference = workers.size() - maximumPoolSize;            while (difference > 0) {                removeWorker();                --difference;            }        }    }    /**     * {@inheritDoc}     */    @Override    public boolean awaitTermination(long timeout, TimeUnit unit)            throws InterruptedException {        long deadline = System.currentTimeMillis() + unit.toMillis(timeout);        synchronized (workers) {            while (!isTerminated()) {                long waitTime = deadline - System.currentTimeMillis();                if (waitTime <= 0) {                    break;                }                workers.wait(waitTime);            }        }        return isTerminated();    }    /**     * {@inheritDoc}     */    @Override    public boolean isShutdown() {        return shutdown;    }    /**     * {@inheritDoc}     */    @Override    public boolean isTerminated() {        if (!shutdown) {            return false;        }        synchronized (workers) {            return workers.isEmpty();        }    }    /**     * {@inheritDoc}     */    @Override    public void shutdown() {        if (shutdown) {            return;        }        shutdown = true;        synchronized (workers) {            for (int i = workers.size(); i > 0; i --) {                waitingSessions.offer(EXIT_SIGNAL);            }        }    }    /**     * {@inheritDoc}     */    @Override    public List<Runnable> shutdownNow() {        shutdown();        List<Runnable> answer = new ArrayList<Runnable>();        IoSession session;                while ((session = waitingSessions.poll()) != null) {            if (session == EXIT_SIGNAL) {                waitingSessions.offer(EXIT_SIGNAL);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -