📄 orderedthreadpoolexecutor.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.filter.executor;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Queue;import java.util.Set;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.core.session.AttributeKey;import org.apache.mina.core.session.DummySession;import org.apache.mina.core.session.IoEvent;import org.apache.mina.core.session.IoSession;/** * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s. * <p> * If you don't need to maintain the order of events per session, please use * {@link UnorderedThreadPoolExecutor}. * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 762167 $, $Date: 2009-04-05 23:57:44 +0200 (Sun, 05 Apr 2009) $ * @org.apache.xbean.XBean */public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { /** A default value for the initial pool size */ private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0; /** A default value for the maximum pool size */ private static final int DEFAULT_MAX_THREAD_POOL = 16; /** A default value for the KeepAlive delay */ private static final int DEFAULT_KEEP_ALIVE = 30; private static final IoSession EXIT_SIGNAL = new DummySession(); /** A key stored into the session's attribute for the event tasks being queued */ private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue"); private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>(); private final Set<Worker> workers = new HashSet<Worker>(); private volatile int largestPoolSize; private final AtomicInteger idleWorkers = new AtomicInteger(); private long completedTaskCount; private volatile boolean shutdown; private final IoEventQueueHandler eventQueueHandler; /** * Creates a default ThreadPool, with default values : * - minimum pool size is 0 * - maximum pool size is 16 * - keepAlive set to 30 seconds * - A default ThreadFactory * - All events are accepted */ public OrderedThreadPoolExecutor() { this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); } /** * Creates a default ThreadPool, with default values : * - minimum pool size is 0 * - keepAlive set to 30 seconds * - A default ThreadFactory * - All events are accepted * * @param maximumPoolSize The maximum pool size */ public OrderedThreadPoolExecutor(int maximumPoolSize) { this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); } /** * Creates a default ThreadPool, with default values : * - keepAlive set to 30 seconds * - A default ThreadFactory * - All events are accepted * * @param corePoolSize The initial pool sizePoolSize * @param maximumPoolSize The maximum pool size */ public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); } /** * Creates a default ThreadPool, with default values : * - A default ThreadFactory * - All events are accepted * * @param corePoolSize The initial pool sizePoolSize * @param maximumPoolSize The maximum pool size * @param keepAliveTime Default duration for a thread * @param unit Time unit used for the keepAlive value */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null); } /** * Creates a default ThreadPool, with default values : * - A default ThreadFactory * * @param corePoolSize The initial pool sizePoolSize * @param maximumPoolSize The maximum pool size * @param keepAliveTime Default duration for a thread * @param unit Time unit used for the keepAlive value * @param queueHandler The queue used to store events */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, IoEventQueueHandler queueHandler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler); } /** * Creates a default ThreadPool, with default values : * - A default ThreadFactory * * @param corePoolSize The initial pool sizePoolSize * @param maximumPoolSize The maximum pool size * @param keepAliveTime Default duration for a thread * @param unit Time unit used for the keepAlive value * @param threadFactory The factory used to create threads */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null); } /** * Creates a new instance of a OrderedThreadPoolExecutor. * * @param corePoolSize The initial pool sizePoolSize * @param maximumPoolSize The maximum pool size * @param keepAliveTime Default duration for a thread * @param unit Time unit used for the keepAlive value * @param threadFactory The factory used to create threads * @param queueHandler The queue used to store events */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) { // We have to initialize the pool with default values (0 and 1) in order to // handle the exception in a better way. We can't add a try {} catch() {} // around the super() call. super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy()); if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { throw new IllegalArgumentException("corePoolSize: " + corePoolSize); } if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) { throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); } // Now, we can setup the pool sizes super.setCorePoolSize( corePoolSize ); super.setMaximumPoolSize( maximumPoolSize ); // The queueHandler might be null. this.eventQueueHandler = queueHandler; } /** * @return The associated queue handler. */ public IoEventQueueHandler getQueueHandler() { return eventQueueHandler; } /** * {@inheritDoc} */ @Override public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { // Ignore the request. It must always be AbortPolicy. } /** * Add a new thread to execute a task, if needed and possible. * It depends on the current pool size. If it's full, we do nothing. */ private void addWorker() { synchronized (workers) { if (workers.size() >= super.getMaximumPoolSize()) { return; } // Create a new worker, and add it to the thread pool Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); // Now, we can start it. thread.start(); workers.add(worker); if (workers.size() > largestPoolSize) { largestPoolSize = workers.size(); } } } /** * Add a new Worker only if there are no idle worker. */ private void addWorkerIfNecessary() { if (idleWorkers.get() == 0) { synchronized (workers) { if (workers.isEmpty() || (idleWorkers.get() == 0)) { addWorker(); } } } } private void removeWorker() { synchronized (workers) { if (workers.size() <= super.getCorePoolSize()) { return; } waitingSessions.offer(EXIT_SIGNAL); } } /** * {@inheritDoc} */ @Override public int getMaximumPoolSize() { return super.getMaximumPoolSize(); } /** * {@inheritDoc} */ @Override public void setMaximumPoolSize(int maximumPoolSize) { if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); } synchronized (workers) { super.setMaximumPoolSize( maximumPoolSize ); int difference = workers.size() - maximumPoolSize; while (difference > 0) { removeWorker(); --difference; } } } /** * {@inheritDoc} */ @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.currentTimeMillis() + unit.toMillis(timeout); synchronized (workers) { while (!isTerminated()) { long waitTime = deadline - System.currentTimeMillis(); if (waitTime <= 0) { break; } workers.wait(waitTime); } } return isTerminated(); } /** * {@inheritDoc} */ @Override public boolean isShutdown() { return shutdown; } /** * {@inheritDoc} */ @Override public boolean isTerminated() { if (!shutdown) { return false; } synchronized (workers) { return workers.isEmpty(); } } /** * {@inheritDoc} */ @Override public void shutdown() { if (shutdown) { return; } shutdown = true; synchronized (workers) { for (int i = workers.size(); i > 0; i --) { waitingSessions.offer(EXIT_SIGNAL); } } } /** * {@inheritDoc} */ @Override public List<Runnable> shutdownNow() { shutdown(); List<Runnable> answer = new ArrayList<Runnable>(); IoSession session; while ((session = waitingSessions.poll()) != null) { if (session == EXIT_SIGNAL) { waitingSessions.offer(EXIT_SIGNAL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -