📄 testconcurrency.java
字号:
String key = "blocking"; String NEW_VALUE = VALUE + " abc"; admin.putInCache(key, VALUE); try { // Force a NeedsRefreshException admin.getFromCache(key, 0); fail("NeedsRefreshException should have been thrown"); } catch (NeedsRefreshException nre) { // Fire off another thread to get the same cache entry. // Since blocking mode is enabled this thread should block // until the entry has been updated. GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false); Thread thread = new Thread(getEntry); thread.start(); // Sleep for a bit to simulate the time taken to build the cache entry try { Thread.sleep(200); } catch (InterruptedException ie) { } // Putting the entry in the cache should mean that threads now retrieve // the updated entry admin.putInCache(key, NEW_VALUE); getEntry = new GetEntry(key, NEW_VALUE, -1, false); thread = new Thread(getEntry); thread.start(); try { Object fromCache = admin.getFromCache(key, -1); assertEquals(NEW_VALUE, fromCache); } catch (NeedsRefreshException e) { admin.cancelUpdate(key); fail("Should not have received a NeedsRefreshException"); } } } /** * Checks whether the cache handles simultaneous attempts to access a * stable cache entry correctly when the blocking mode is enabled. * * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times. * The test is sucessfull if after some time, all threads are properly released */ public void testConcurrentStaleGets() { GeneralCacheAdministrator staticAdmin = admin; admin = new GeneralCacheAdministrator(); //avoid poluting other test cases try { // A test for the case where oscache.blocking = true //admin.destroy(); Properties p = new Properties(); p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true"); admin = new GeneralCacheAdministrator(p); assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking()); int nbThreads = 50; int retryByThreads = 10000; String key = "new"; //First put a value admin.putInCache(key, VALUE); try { //Then test without concurrency that it is reported as stale when time-to-live is zero admin.getFromCache(key, 0); fail("NeedsRefreshException should have been thrown"); } catch (NeedsRefreshException nre) { //Ok this is was is excpected, we can release the update admin.cancelUpdate(key); } //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update Thread[] spawnedThreads = new Thread[nbThreads]; BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) { GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations); Thread thread = new Thread(getEntry); spawnedThreads[threadIndex] = thread; thread.start(); } // OK, those threads should now repeatidely be blocked waiting for the new cache // entry to appear. Wait for all of them to terminate int maxWaitingSeconds = 100; int maxWaitForEachThread = 5; long waitStartTime = System.currentTimeMillis(); boolean atLeastOneThreadRunning = false; while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) { atLeastOneThreadRunning = false; //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running. try { Thread.sleep(500); } catch (InterruptedException ie) { } //check whether all threads are done. for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) { Thread inspectedThread = spawnedThreads[threadIndex]; try { inspectedThread.join(maxWaitForEachThread * 1000); } catch (InterruptedException e) { fail("Thread #" + threadIndex + " was interrupted"); } if (inspectedThread.isAlive()) { atLeastOneThreadRunning = true; log.error("Thread #" + threadIndex + " did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s "); } } if (!atLeastOneThreadRunning) { break; //while loop, test success. } } assertTrue("at least one thread did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ", !atLeastOneThreadRunning); for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) { assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex)); } } finally { admin = staticAdmin; //Avoid po } } private class GetEntry implements Runnable { String key; String value; boolean expectNRE; int time; GetEntry(String key, String value, int time, boolean expectNRE) { this.key = key; this.value = value; this.time = time; this.expectNRE = expectNRE; } public void run() { try { // Get from the cache Object fromCache = admin.getFromCache(key, time); assertEquals(value, fromCache); } catch (NeedsRefreshException nre) { if (!expectNRE) { admin.cancelUpdate(key); fail("Thread should have blocked until a new cache entry was ready"); } else { // Put a new piece of content into the cache admin.putInCache(key, value); } } } } /** * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update. */ private class GetStaleEntryAndCancelUpdate implements Runnable { String key; int retries; int time; private final BitSet successfullThreadTerminations; private final int threadIndex; GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) { this.key = key; this.time = time; this.retries = retries; this.threadIndex = threadIndex; this.successfullThreadTerminations = successfullThreadTerminations; } public void run() { for (int retryIndex = 0; retryIndex < retries; retryIndex++) { try { // Get from the cache Object fromCache = admin.getFromCache(key, time); assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache); } catch (NeedsRefreshException nre) { try { admin.cancelUpdate(key); } catch (Throwable t) { log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t); fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]"); } } catch (Throwable t) { log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t); fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]"); } } //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded. synchronized (successfullThreadTerminations) { successfullThreadTerminations.set(threadIndex); } } } private class OSGeneralTest implements Runnable { public void doit(int i) { int refreshPeriod = 500 /*millis*/; String key = KEY + (i % UNIQUE_KEYS); admin.putInCache(key, VALUE); try { // Get from the cache admin.getFromCache(KEY, refreshPeriod); } catch (NeedsRefreshException nre) { // Get the value // Store in the cache admin.putInCache(KEY, VALUE); } // Flush occasionally if ((i % (UNIQUE_KEYS + 1)) == 0) { admin.getCache().flushEntry(key); } } public void run() { int start = (int) (Math.random() * UNIQUE_KEYS); System.out.print(start + " "); for (int i = start; i < (start + ITERATION_COUNT); i++) { doit(i); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -