⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testconcurrency.java

📁 一个不错的cache
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        String key = "blocking";        String NEW_VALUE = VALUE + " abc";        admin.putInCache(key, VALUE);        try {            // Force a NeedsRefreshException            admin.getFromCache(key, 0);            fail("NeedsRefreshException should have been thrown");        } catch (NeedsRefreshException nre) {            // Fire off another thread to get the same cache entry.            // Since blocking mode is enabled this thread should block            // until the entry has been updated.            GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);            Thread thread = new Thread(getEntry);            thread.start();            // Sleep for a bit to simulate the time taken to build the cache entry            try {                Thread.sleep(200);            } catch (InterruptedException ie) {            }            // Putting the entry in the cache should mean that threads now retrieve            // the updated entry            admin.putInCache(key, NEW_VALUE);            getEntry = new GetEntry(key, NEW_VALUE, -1, false);            thread = new Thread(getEntry);            thread.start();            try {                Object fromCache = admin.getFromCache(key, -1);                assertEquals(NEW_VALUE, fromCache);            } catch (NeedsRefreshException e) {                admin.cancelUpdate(key);                fail("Should not have received a NeedsRefreshException");            }        }    }    /**     * Checks whether the cache handles simultaneous attempts to access a     * stable cache entry correctly when the blocking mode is enabled.     *     * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.     * The test is sucessfull if after some time, all threads are properly released     */    public void testConcurrentStaleGets() {        GeneralCacheAdministrator staticAdmin = admin;        admin = new GeneralCacheAdministrator(); //avoid poluting other test cases        try {            // A test for the case where oscache.blocking = true            //admin.destroy();            Properties p = new Properties();            p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");            admin = new GeneralCacheAdministrator(p);            assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());            int nbThreads = 50;            int retryByThreads = 10000;            String key = "new";            //First put a value            admin.putInCache(key, VALUE);            try {                //Then test without concurrency that it is reported as stale when time-to-live is zero                 admin.getFromCache(key, 0);                fail("NeedsRefreshException should have been thrown");            } catch (NeedsRefreshException nre) {                //Ok this is was is excpected, we can release the update                admin.cancelUpdate(key);            }            //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update            Thread[] spawnedThreads = new Thread[nbThreads];            BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated            for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {                GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations);                Thread thread = new Thread(getEntry);                spawnedThreads[threadIndex] = thread;                thread.start();            }            // OK, those threads should now repeatidely be blocked waiting for the new cache            // entry to appear. Wait for all of them to terminate            int maxWaitingSeconds = 100;            int maxWaitForEachThread = 5;            long waitStartTime = System.currentTimeMillis();            boolean atLeastOneThreadRunning = false;            while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) {                atLeastOneThreadRunning = false;                //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.                try {                    Thread.sleep(500);                } catch (InterruptedException ie) {                }                //check whether all threads are done.                for (int threadIndex = 0; threadIndex < nbThreads;                        threadIndex++) {                    Thread inspectedThread = spawnedThreads[threadIndex];                    try {                        inspectedThread.join(maxWaitForEachThread * 1000);                    } catch (InterruptedException e) {                        fail("Thread #" + threadIndex + " was interrupted");                    }                    if (inspectedThread.isAlive()) {                        atLeastOneThreadRunning = true;                        log.error("Thread #" + threadIndex + " did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ");                    }                }                if (!atLeastOneThreadRunning) {                    break; //while loop, test success.                }            }            assertTrue("at least one thread did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ", !atLeastOneThreadRunning);            for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {                assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex));            }        } finally {            admin = staticAdmin;            //Avoid po        }    }    private class GetEntry implements Runnable {        String key;        String value;        boolean expectNRE;        int time;        GetEntry(String key, String value, int time, boolean expectNRE) {            this.key = key;            this.value = value;            this.time = time;            this.expectNRE = expectNRE;        }        public void run() {            try {                // Get from the cache                Object fromCache = admin.getFromCache(key, time);                assertEquals(value, fromCache);            } catch (NeedsRefreshException nre) {                if (!expectNRE) {                    admin.cancelUpdate(key);                    fail("Thread should have blocked until a new cache entry was ready");                } else {                    // Put a new piece of content into the cache                    admin.putInCache(key, value);                }            }        }    }    /**      * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.      */    private class GetStaleEntryAndCancelUpdate implements Runnable {        String key;        int retries;        int time;        private final BitSet successfullThreadTerminations;        private final int threadIndex;        GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) {            this.key = key;            this.time = time;            this.retries = retries;            this.threadIndex = threadIndex;            this.successfullThreadTerminations = successfullThreadTerminations;        }        public void run() {            for (int retryIndex = 0; retryIndex < retries; retryIndex++) {                try {                    // Get from the cache                    Object fromCache = admin.getFromCache(key, time);                    assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache);                } catch (NeedsRefreshException nre) {                    try {                        admin.cancelUpdate(key);                    } catch (Throwable t) {                        log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);                        fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");                    }                } catch (Throwable t) {                    log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);                    fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");                }            }            //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.            synchronized (successfullThreadTerminations) {                successfullThreadTerminations.set(threadIndex);            }        }    }    private class OSGeneralTest implements Runnable {        public void doit(int i) {            int refreshPeriod = 500 /*millis*/;            String key = KEY + (i % UNIQUE_KEYS);            admin.putInCache(key, VALUE);            try {                // Get from the cache                admin.getFromCache(KEY, refreshPeriod);            } catch (NeedsRefreshException nre) {                // Get the value                // Store in the cache                admin.putInCache(KEY, VALUE);            }            // Flush occasionally            if ((i % (UNIQUE_KEYS + 1)) == 0) {                admin.getCache().flushEntry(key);            }        }        public void run() {            int start = (int) (Math.random() * UNIQUE_KEYS);            System.out.print(start + " ");            for (int i = start; i < (start + ITERATION_COUNT); i++) {                doit(i);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -