📄 testconcurrency.java.rej
字号:
@@ -60,6 +66,101 @@
} /**+ * Checks whether the cache handles simultaneous attempts to access a+ * stable cache entry correctly when the blocking mode is enabled.+ * + * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.+ * The test is sucessfull if after some time, all threads are properly released+ */+ public void testConcurrentStaleGets() {+ GeneralCacheAdministrator staticAdmin = admin;+ admin = new GeneralCacheAdministrator(); //avoid poluting other test cases+ + try {+ // A test for the case where oscache.blocking = true+ //admin.destroy();+ + Properties p = new Properties();+ p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");+ admin = new GeneralCacheAdministrator(p);+ + assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());+ + int nbThreads = 10;+ int retryByThreads = 10000;+ + String key = "new";+ + //First put a value+ admin.putInCache(key, VALUE);+ + try {+ //Then test without concurrency that it is reported as stale when time-to-live is zero + admin.getFromCache(key, 0);+ fail("NeedsRefreshException should have been thrown");+ } catch (NeedsRefreshException nre) {+ //Ok this is was is excpected, we can release the update+ admin.cancelUpdate(key);+ }+ + //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update+ Thread spawnedThreads [] = new Thread[nbThreads];+ BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated+ for(int threadIndex=0; threadIndex<nbThreads; threadIndex++) {+ GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations);+ Thread thread = new Thread(getEntry);+ spawnedThreads[threadIndex] = thread;+ thread.start();+ }+ + // OK, those threads should now repeatidely be blocked waiting for the new cache+ // entry to appear. Wait for all of them to terminate+ int maxWaitingSeconds = 100;+ int maxWaitForEachThread= 5;+ long waitStartTime = System.currentTimeMillis();+ + boolean atLeastOneThreadRunning = false;+ + while (System.currentTimeMillis() - waitStartTime < maxWaitingSeconds *1000) {+ atLeastOneThreadRunning = false;+ + //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.+ try {+ Thread.sleep(500);+ } catch (InterruptedException ie) {+ }+ + //check whether all threads are done.+ for(int threadIndex=0; threadIndex<nbThreads; threadIndex++) {+ Thread inspectedThread = spawnedThreads[threadIndex];+ try {+ inspectedThread.join(maxWaitForEachThread * 1000);+ } catch (InterruptedException e) {+ fail("Thread #" + threadIndex + " was interrupted");+ }+ if (inspectedThread.isAlive()) {+ atLeastOneThreadRunning = true;+ log.error("Thread #" + threadIndex + " did not complete within [" + (System.currentTimeMillis() - waitStartTime ) /1000 + "] s ");+ }+ }+ if (! atLeastOneThreadRunning) {+ break; //while loop, test success.+ }+ + }+ + assertTrue("at least one thread did not complete within [" + (System.currentTimeMillis() - waitStartTime ) /1000 + "] s ", ! atLeastOneThreadRunning);+ + for(int threadIndex=0; threadIndex<nbThreads; threadIndex++) {+ assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex));+ }+ } finally {+ admin = staticAdmin;+ //Avoid po+ }+ }++ /** * Check that the cache handles simultaneous attempts to access a * new cache entry correctly */@@ -293,6 +394,51 @@
} } + /**+ * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.+ */+ private class GetStaleEntryAndCancelUpdate implements Runnable {+ String key;+ int time;+ int retries;+ private final int threadIndex;+ private final BitSet successfullThreadTerminations;++ GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) {+ this.key = key;+ this.time = time;+ this.retries = retries;+ this.threadIndex = threadIndex;+ this.successfullThreadTerminations = successfullThreadTerminations;+ }++ public void run() {+ for (int retryIndex=0; retryIndex<retries; retryIndex++) {+ try {+ // Get from the cache+ Object fromCache = admin.getFromCache(key, time);+ assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache+ "]", fromCache);+ } catch (NeedsRefreshException nre) {+ try {+ admin.cancelUpdate(key);+ } catch(Throwable t) {+ log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);+ fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");+ }+ } catch(Throwable t) {+ log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);+ fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");+ }+ }+ + //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.+ synchronized(successfullThreadTerminations) {+ successfullThreadTerminations.set(threadIndex);+ }+ }+ }++ private class OSGeneralTest implements Runnable { public void doit(int i) { int refreshPeriod = 500 /*millis*/;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -