testconcurrency.java

来自「oscache-2.4.1-full」· Java 代码 · 共 490 行 · 第 1/2 页

JAVA
490
字号
        String key = "blocking";
        String NEW_VALUE = VALUE + " abc";
        admin.putInCache(key, VALUE);

        try {
            // Force a NeedsRefreshException
            admin.getFromCache(key, 0);
            fail("NeedsRefreshException should have been thrown");
        } catch (NeedsRefreshException nre) {
            // Fire off another thread to get the same cache entry.
            // Since blocking mode is enabled this thread should block
            // until the entry has been updated.
            GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);
            Thread thread = new Thread(getEntry);
            thread.start();

            // Sleep for a bit to simulate the time taken to build the cache entry
            try {
                Thread.sleep(200);
            } catch (InterruptedException ie) {
            }

            // Putting the entry in the cache should mean that threads now retrieve
            // the updated entry
            admin.putInCache(key, NEW_VALUE);

            getEntry = new GetEntry(key, NEW_VALUE, -1, false);
            thread = new Thread(getEntry);
            thread.start();

            try {
                Object fromCache = admin.getFromCache(key, -1);
                assertEquals(NEW_VALUE, fromCache);
            } catch (NeedsRefreshException e) {
                admin.cancelUpdate(key);
                fail("Should not have received a NeedsRefreshException");
            }
        }
    }

    /**
     * Checks whether the cache handles simultaneous attempts to access a
     * stable cache entry correctly when the blocking mode is enabled.
     *
     * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
     * The test is sucessfull if after some time, all threads are properly released
     */
    public void testConcurrentStaleGets() {
        GeneralCacheAdministrator staticAdmin = admin;
        admin = new GeneralCacheAdministrator(); //avoid poluting other test cases

        try {
            // A test for the case where oscache.blocking = true
            //admin.destroy();
            Properties p = new Properties();
            p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
            admin = new GeneralCacheAdministrator(p);

            assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());

            int nbThreads = 50;
            int retryByThreads = 10000;

            String key = "new";

            //First put a value
            admin.putInCache(key, VALUE);

            try {
                //Then test without concurrency that it is reported as stale when time-to-live is zero 
                admin.getFromCache(key, 0);
                fail("NeedsRefreshException should have been thrown");
            } catch (NeedsRefreshException nre) {
                //Ok this is was is excpected, we can release the update
                admin.cancelUpdate(key);
            }

            //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
            Thread[] spawnedThreads = new Thread[nbThreads];
            BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated

            for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
                GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations);
                Thread thread = new Thread(getEntry);
                spawnedThreads[threadIndex] = thread;
                thread.start();
            }

            // OK, those threads should now repeatidely be blocked waiting for the new cache
            // entry to appear. Wait for all of them to terminate
            long maxWaitingSeconds = 100;
            int maxWaitForEachThread = 5;
            long waitStartTime = System.currentTimeMillis();

            boolean atLeastOneThreadRunning = false;

            while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) {
                atLeastOneThreadRunning = false;

                //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ie) {
                }

                //check whether all threads are done.
                for (int threadIndex = 0; threadIndex < nbThreads;
                        threadIndex++) {
                    Thread inspectedThread = spawnedThreads[threadIndex];

                    try {
                        inspectedThread.join(maxWaitForEachThread * 1000);
                    } catch (InterruptedException e) {
                        fail("Thread #" + threadIndex + " was interrupted");
                    }

                    if (inspectedThread.isAlive()) {
                        atLeastOneThreadRunning = true;
                        log.error("Thread #" + threadIndex + " did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ");
                    }
                }

                if (!atLeastOneThreadRunning) {
                    break; //while loop, test success.
                }
            }

            assertTrue("at least one thread did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ", !atLeastOneThreadRunning);

            for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
                assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex));
            }
        } finally {
            admin = staticAdmin;

            //Avoid po
        }
    }

    private class GetEntry implements Runnable {
        String key;
        String value;
        boolean expectNRE;
        int time;

        GetEntry(String key, String value, int time, boolean expectNRE) {
            this.key = key;
            this.value = value;
            this.time = time;
            this.expectNRE = expectNRE;
        }

        public void run() {
            try {
                // Get from the cache
                Object fromCache = admin.getFromCache(key, time);
                assertEquals(value, fromCache);
            } catch (NeedsRefreshException nre) {
                if (!expectNRE) {
                    admin.cancelUpdate(key);
                    fail("Thread should have blocked until a new cache entry was ready");
                } else {
                    // Put a new piece of content into the cache
                    admin.putInCache(key, value);
                }
            }
        }
    }

    /**
      * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
      */
    private class GetStaleEntryAndCancelUpdate implements Runnable {
        String key;
        int retries;
        int time;
        private final BitSet successfullThreadTerminations;
        private final int threadIndex;

        GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) {
            this.key = key;
            this.time = time;
            this.retries = retries;
            this.threadIndex = threadIndex;
            this.successfullThreadTerminations = successfullThreadTerminations;
        }

        public void run() {
            for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
                try {
                    // Get from the cache
                    Object fromCache = admin.getFromCache(key, time);
                    assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache);
                } catch (NeedsRefreshException nre) {
                    try {
                        admin.cancelUpdate(key);
                    } catch (Throwable t) {
                        log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);
                        fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
                    }
                } catch (Throwable t) {
                    log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);
                    fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
                }
            }

            //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.
            synchronized (successfullThreadTerminations) {
                successfullThreadTerminations.set(threadIndex);
            }
        }
    }

    private class OSGeneralTest implements Runnable {
        public void doit(int i) {
            int refreshPeriod = 500 /*millis*/;
            String key = KEY + (i % UNIQUE_KEYS);
            admin.putInCache(key, VALUE);

            try {
                // Get from the cache
                admin.getFromCache(KEY, refreshPeriod);
            } catch (NeedsRefreshException nre) {
                // Get the value
                // Store in the cache
                admin.putInCache(KEY, VALUE);
            }

            // Flush occasionally
            if ((i % (UNIQUE_KEYS + 1)) == 0) {
                admin.getCache().flushEntry(key);
            }
        }

        public void run() {
            int start = (int) (Math.random() * UNIQUE_KEYS);
            System.out.print(start + " ");

            for (int i = start; i < (start + ITERATION_COUNT); i++) {
                doit(i);
            }
        }
    }
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?