threadpoolexecutortest.java

来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 1,573 行 · 第 1/4 页

JAVA
1,573
字号
/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 * Other contributors include Andrew Wright, Jeffrey Hayes,
 * Pat Fisher, Mike Judd.
 */

import edu.emory.mathcs.backport.java.util.concurrent.*;
import junit.framework.*;
import edu.emory.mathcs.backport.java.util.*;
import java.util.*;

public class ThreadPoolExecutorTest extends JSR166TestCase {
    public static void main(String[] args) {
	junit.textui.TestRunner.run (suite());
    }
    public static Test suite() {
        return new TestSuite(ThreadPoolExecutorTest.class);
    }

    static class ExtendedTPE extends ThreadPoolExecutor {
        volatile boolean beforeCalled = false;
        volatile boolean afterCalled = false;
        volatile boolean terminatedCalled = false;
        public ExtendedTPE() {
            super(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new SynchronousQueue());
        }
        protected void beforeExecute(Thread t, Runnable r) {
            beforeCalled = true;
        }
        protected void afterExecute(Runnable r, Throwable t) {
            afterCalled = true;
        }
        protected void terminated() {
            terminatedCalled = true;
        }
    }

    static class FailingThreadFactory implements ThreadFactory{
        int calls = 0;
        public Thread newThread(Runnable r){
            if (++calls > 1) return null;
            return new Thread(r);
        }
    }


    /**
     *  execute successfully executes a runnable
     */
    public void testExecute() {
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        try {
            p1.execute(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(SHORT_DELAY_MS);
                        } catch(InterruptedException e){
                            threadUnexpectedException();
                        }
                    }
                });
	    Thread.sleep(SMALL_DELAY_MS);
        } catch(InterruptedException e){
            unexpectedException();
        }
        joinPool(p1);
    }

    /**
     *  getActiveCount increases but doesn't overestimate, when a
     *  thread becomes active
     */
    public void testGetActiveCount() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(0, p2.getActiveCount());
        p2.execute(new MediumRunnable());
        try {
            Thread.sleep(SHORT_DELAY_MS);
        } catch(Exception e){
            unexpectedException();
        }
        assertEquals(1, p2.getActiveCount());
        joinPool(p2);
    }

    /**
     *  prestartCoreThread starts a thread if under corePoolSize, else doesn't
     */
    public void testPrestartCoreThread() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(0, p2.getPoolSize());
        assertTrue(p2.prestartCoreThread());
        assertEquals(1, p2.getPoolSize());
        assertTrue(p2.prestartCoreThread());
        assertEquals(2, p2.getPoolSize());
        assertFalse(p2.prestartCoreThread());
        assertEquals(2, p2.getPoolSize());
        joinPool(p2);
    }

    /**
     *  prestartAllCoreThreads starts all corePoolSize threads
     */
    public void testPrestartAllCoreThreads() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(0, p2.getPoolSize());
        p2.prestartAllCoreThreads();
        assertEquals(2, p2.getPoolSize());
        p2.prestartAllCoreThreads();
        assertEquals(2, p2.getPoolSize());
        joinPool(p2);
    }

    /**
     *   getCompletedTaskCount increases, but doesn't overestimate,
     *   when tasks complete
     */
    public void testGetCompletedTaskCount() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(0, p2.getCompletedTaskCount());
        p2.execute(new ShortRunnable());
        try {
            Thread.sleep(SMALL_DELAY_MS);
        } catch(Exception e){
            unexpectedException();
        }
        assertEquals(1, p2.getCompletedTaskCount());
        try { p2.shutdown(); } catch(SecurityException ok) { return; }
        joinPool(p2);
    }

    /**
     *   getCorePoolSize returns size given in constructor if not otherwise set
     */
    public void testGetCorePoolSize() {
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(1, p1.getCorePoolSize());
        joinPool(p1);
    }

    /**
     *   getKeepAliveTime returns value given in constructor if not otherwise set
     */
    public void testGetKeepAliveTime() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(1, p2.getKeepAliveTime(TimeUnit.SECONDS));
        joinPool(p2);
    }


    /**
     * getThreadFactory returns factory in constructor if not set
     */
    public void testGetThreadFactory() {
        ThreadFactory tf = new SimpleThreadFactory();
        ThreadPoolExecutor p = new ThreadPoolExecutor(1,2,LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10), tf, new NoOpREHandler());
        assertSame(tf, p.getThreadFactory());
        joinPool(p);
    }

    /**
     * setThreadFactory sets the thread factory returned by getThreadFactory
     */
    public void testSetThreadFactory() {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1,2,LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        ThreadFactory tf = new SimpleThreadFactory();
        p.setThreadFactory(tf);
        assertSame(tf, p.getThreadFactory());
        joinPool(p);
    }


    /**
     * setThreadFactory(null) throws NPE
     */
    public void testSetThreadFactoryNull() {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1,2,LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        try {
            p.setThreadFactory(null);
            shouldThrow();
        } catch (NullPointerException success) {
        } finally {
            joinPool(p);
        }
    }

    /**
     * getRejectedExecutionHandler returns handler in constructor if not set
     */
    public void testGetRejectedExecutionHandler() {
        RejectedExecutionHandler h = new NoOpREHandler();
        ThreadPoolExecutor p = new ThreadPoolExecutor(1,2,LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10), h);
        assertSame(h, p.getRejectedExecutionHandler());
        joinPool(p);
    }

    /**
     * setRejectedExecutionHandler sets the handler returned by
     * getRejectedExecutionHandler
     */
    public void testSetRejectedExecutionHandler() {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1,2,LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        RejectedExecutionHandler h = new NoOpREHandler();
        p.setRejectedExecutionHandler(h);
        assertSame(h, p.getRejectedExecutionHandler());
        joinPool(p);
    }


    /**
     * setRejectedExecutionHandler(null) throws NPE
     */
    public void testSetRejectedExecutionHandlerNull() {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1,2,LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        try {
            p.setRejectedExecutionHandler(null);
            shouldThrow();
        } catch (NullPointerException success) {
        } finally {
            joinPool(p);
        }
    }


    /**
     *   getLargestPoolSize increases, but doesn't overestimate, when
     *   multiple threads active
     */
    public void testGetLargestPoolSize() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        try {
            assertEquals(0, p2.getLargestPoolSize());
            p2.execute(new MediumRunnable());
            p2.execute(new MediumRunnable());
            Thread.sleep(SHORT_DELAY_MS);
            assertEquals(2, p2.getLargestPoolSize());
        } catch(Exception e){
            unexpectedException();
        }
        joinPool(p2);
    }

    /**
     *   getMaximumPoolSize returns value given in constructor if not
     *   otherwise set
     */
    public void testGetMaximumPoolSize() {
        ThreadPoolExecutor p2 = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(2, p2.getMaximumPoolSize());
        joinPool(p2);
    }

    /**
     *   getPoolSize increases, but doesn't overestimate, when threads
     *   become active
     */
    public void testGetPoolSize() {
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertEquals(0, p1.getPoolSize());
        p1.execute(new MediumRunnable());
        assertEquals(1, p1.getPoolSize());
        joinPool(p1);
    }

    /**
     *  getTaskCount increases, but doesn't overestimate, when tasks submitted
     */
    public void testGetTaskCount() {
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        try {
            assertEquals(0, p1.getTaskCount());
            p1.execute(new MediumRunnable());
            Thread.sleep(SHORT_DELAY_MS);
            assertEquals(1, p1.getTaskCount());
        } catch(Exception e){
            unexpectedException();
        }
        joinPool(p1);
    }

    /**
     *   isShutDown is false before shutdown, true after
     */
    public void testIsShutdown() {

	ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertFalse(p1.isShutdown());
        try { p1.shutdown(); } catch(SecurityException ok) { return; }
	assertTrue(p1.isShutdown());
        joinPool(p1);
    }


    /**
     *  isTerminated is false before termination, true after
     */
    public void testIsTerminated() {
	ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertFalse(p1.isTerminated());
        try {
            p1.execute(new MediumRunnable());
        } finally {
            try { p1.shutdown(); } catch(SecurityException ok) { return; }
        }
	try {
	    assertTrue(p1.awaitTermination(LONG_DELAY_MS, TimeUnit.MILLISECONDS));
            assertTrue(p1.isTerminated());
	} catch(Exception e){
            unexpectedException();
        }
    }

    /**
     *  isTerminating is not true when running or when terminated
     */
    public void testIsTerminating() {
	ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        assertFalse(p1.isTerminating());
        try {
            p1.execute(new SmallRunnable());
            assertFalse(p1.isTerminating());
        } finally {
            try { p1.shutdown(); } catch(SecurityException ok) { return; }
        }
        try {
	    assertTrue(p1.awaitTermination(LONG_DELAY_MS, TimeUnit.MILLISECONDS));
            assertTrue(p1.isTerminated());
            assertFalse(p1.isTerminating());
	} catch(Exception e){
            unexpectedException();
        }
    }

    /**
     * getQueue returns the work queue, which contains queued tasks
     */
    public void testGetQueue() {
        BlockingQueue q = new ArrayBlockingQueue(10);
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, q);
        FutureTask[] tasks = new FutureTask[5];
        for(int i = 0; i < 5; i++){
            tasks[i] = new FutureTask(new MediumPossiblyInterruptedRunnable(), Boolean.TRUE);
            p1.execute(tasks[i]);
        }
        try {
            Thread.sleep(SHORT_DELAY_MS);
            BlockingQueue wq = p1.getQueue();
            assertSame(q, wq);
            assertFalse(wq.contains(tasks[0]));
            assertTrue(wq.contains(tasks[4]));
        } catch(Exception e) {
            unexpectedException();
        } finally {
            joinPool(p1);
        }
    }

    /**
     * remove(task) removes queued task, and fails to remove active task
     */
    public void testRemove() {
        BlockingQueue q = new ArrayBlockingQueue(10);
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, q);
        FutureTask[] tasks = new FutureTask[5];
        for(int i = 0; i < 5; i++){
            tasks[i] = new FutureTask(new MediumPossiblyInterruptedRunnable(), Boolean.TRUE);
            p1.execute(tasks[i]);
        }
        try {
            Thread.sleep(SHORT_DELAY_MS);
            assertFalse(p1.remove(tasks[0]));
            assertTrue(q.contains(tasks[4]));
            assertTrue(q.contains(tasks[3]));
            assertTrue(p1.remove(tasks[4]));
            assertFalse(p1.remove(tasks[4]));
            assertFalse(q.contains(tasks[4]));
            assertTrue(q.contains(tasks[3]));
            assertTrue(p1.remove(tasks[3]));
            assertFalse(q.contains(tasks[3]));
        } catch(Exception e) {
            unexpectedException();
        } finally {
            joinPool(p1);
        }
    }

    /**
     *   purge removes cancelled tasks from the queue
     */
    public void testPurge() {
        ThreadPoolExecutor p1 = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10));
        FutureTask[] tasks = new FutureTask[5];

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?