⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testconcurrency.java.rej

📁 一个不错的cache
💻 REJ
字号:
@@ -60,6 +66,101 @@
     }      /**+     * Checks whether the cache handles simultaneous attempts to access a+     * stable cache entry correctly when the blocking mode is enabled.+     * +     * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.+     * The test is sucessfull if after some time, all threads are properly released+     */+    public void testConcurrentStaleGets() {+    	GeneralCacheAdministrator staticAdmin = admin;+    	admin = new GeneralCacheAdministrator(); //avoid poluting other test cases+    	+    	try {+    		// A test for the case where oscache.blocking = true+    		//admin.destroy();+    		+    		Properties p = new Properties();+    		p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");+    		admin = new GeneralCacheAdministrator(p);+    		+    		assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());+    		+    		int nbThreads = 10;+    		int retryByThreads = 10000;+    		+    		String key = "new";+    		+    		//First put a value+    		admin.putInCache(key, VALUE);+    		+    		try {+    			//Then test without concurrency that it is reported as stale when time-to-live is zero +    			admin.getFromCache(key, 0);+    			fail("NeedsRefreshException should have been thrown");+    		} catch (NeedsRefreshException nre) {+    			//Ok this is was is excpected, we can release the update+    			admin.cancelUpdate(key);+    		}+    		+    		//Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update+    		Thread spawnedThreads [] = new Thread[nbThreads];+    		BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated+    		for(int threadIndex=0; threadIndex<nbThreads; threadIndex++) {+    			GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations);+    			Thread thread = new Thread(getEntry);+    			spawnedThreads[threadIndex] = thread;+    			thread.start();+    		}+    		+    		// OK, those threads should now repeatidely be blocked waiting for the new cache+    		// entry to appear. Wait for all of them to terminate+    		int maxWaitingSeconds = 100;+    		int maxWaitForEachThread= 5;+    		long waitStartTime = System.currentTimeMillis();+    		+    		boolean atLeastOneThreadRunning = false;+    		+    		while (System.currentTimeMillis() - waitStartTime < maxWaitingSeconds *1000) {+    			atLeastOneThreadRunning = false;+    			+    			//Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.+    			try {+    				Thread.sleep(500);+    			} catch (InterruptedException ie) {+    			}+    			+    			//check whether all threads are done.+    			for(int threadIndex=0; threadIndex<nbThreads; threadIndex++) {+    				Thread inspectedThread = spawnedThreads[threadIndex];+    				try {+    					inspectedThread.join(maxWaitForEachThread * 1000);+    				} catch (InterruptedException e) {+    					fail("Thread #" + threadIndex + " was interrupted");+    				}+    				if (inspectedThread.isAlive()) {+    					atLeastOneThreadRunning = true;+    					log.error("Thread #" + threadIndex + " did not complete within [" + (System.currentTimeMillis() - waitStartTime ) /1000 + "] s ");+    				}+    			}+    			if (! atLeastOneThreadRunning) {+    				break; //while loop, test success.+    			}+    			+    		}+    		+    		assertTrue("at least one thread did not complete within [" + (System.currentTimeMillis() - waitStartTime ) /1000 + "] s ", ! atLeastOneThreadRunning);+    		+			for(int threadIndex=0; threadIndex<nbThreads; threadIndex++) {+				assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex));+			}+    	} finally {+    		admin = staticAdmin;+    		//Avoid po+    	}+    }++    /**      * Check that the cache handles simultaneous attempts to access a      * new cache entry correctly      */@@ -293,6 +394,51 @@
         }     } +    /**+     * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.+     */+    private class GetStaleEntryAndCancelUpdate implements Runnable {+        String key;+        int time;+        int retries;+		private final int threadIndex;+		private final BitSet successfullThreadTerminations;++        GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) {+            this.key = key;+            this.time = time;+            this.retries = retries;+			this.threadIndex = threadIndex;+			this.successfullThreadTerminations = successfullThreadTerminations;+        }++        public void run() {+        	for (int retryIndex=0; retryIndex<retries; retryIndex++) {+        		try {+        			// Get from the cache+        			Object fromCache = admin.getFromCache(key, time);+        			assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" +  fromCache+ "]", fromCache);+        		} catch (NeedsRefreshException nre) {+        			try {+        				admin.cancelUpdate(key);+        			} catch(Throwable t) {+        				log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);+        				fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");+        			}+        		} catch(Throwable t) {+        			log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);+        			fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");+        		}+        	}+        	+        	//Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.+        	synchronized(successfullThreadTerminations) {+        		successfullThreadTerminations.set(threadIndex);+        	}+        }+    }++         private class OSGeneralTest implements Runnable {         public void doit(int i) {             int refreshPeriod = 500 /*millis*/;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -