📄 testconcurrency2.java
字号:
admin.destroy();
Properties p = new Properties();
p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
admin = new GeneralCacheAdministrator(p);
assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());
// Use a unique key in case these test entries are being persisted
String key = "blocking";
String NEW_VALUE = VALUE + " abc";
admin.putInCache(key, VALUE);
try {
// Force a NeedsRefreshException
admin.getFromCache(key, 0);
fail("NeedsRefreshException should have been thrown");
} catch (NeedsRefreshException nre) {
// Fire off another thread to get the same cache entry.
// Since blocking mode is enabled this thread should block
// until the entry has been updated.
GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);
Thread thread = new Thread(getEntry);
thread.start();
// Sleep for a bit to simulate the time taken to build the cache entry
try {
Thread.sleep(20);
} catch (InterruptedException ie) {
}
// Putting the entry in the cache should mean that threads now retrieve
// the updated entry
admin.putInCache(key, NEW_VALUE);
getEntry = new GetEntry(key, NEW_VALUE, -1, false);
thread = new Thread(getEntry);
thread.start();
try {
Object fromCache = admin.getFromCache(key, -1);
assertEquals(NEW_VALUE, fromCache);
} catch (NeedsRefreshException e) {
admin.cancelUpdate(key);
fail("Should not have received a NeedsRefreshException");
}
}
}
private static final int RETRY_BY_THREADS = 100000;
private static final int NB_THREADS = 4;
/**
* Checks whether the cache handles simultaneous attempts to access a
* stable cache entry correctly when the blocking mode is enabled.
*
* Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
* The test is sucessfull if after some time, all threads are properly released
*/
public void testConcurrentStaleGets() {
GeneralCacheAdministrator staticAdmin = admin;
//admin = new GeneralCacheAdministrator(); //avoid poluting other test cases
try {
// A test for the case where oscache.blocking = true
//admin.destroy();
Properties p = new Properties();
p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
admin = new GeneralCacheAdministrator(p);
assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());
String key = "new";
//First put a value
admin.putInCache(key, VALUE);
try {
//Then test without concurrency that it is reported as stale when time-to-live is zero
admin.getFromCache(key, 0);
fail("NeedsRefreshException should have been thrown");
} catch (NeedsRefreshException nre) {
//Ok this is was is excpected, we can release the update
admin.cancelUpdate(key);
}
//Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
TestRunnable[] spawnedThreads = new TestRunnable[NB_THREADS];
for (int threadIndex = 0; threadIndex < NB_THREADS; threadIndex++) {
spawnedThreads[threadIndex] = new GetStaleEntryAndCancelUpdate(key, 0, RETRY_BY_THREADS);
}
MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(spawnedThreads);
//kickstarts the MTTR & fires off threads
try {
mttr.runTestRunnables(120 * 1000);
} catch (Throwable e) {
fail("at least one thread did not complete");
e.printStackTrace();
}
} finally {
// avoid poluting other test cases
admin = staticAdmin;
}
}
private class GetEntry extends TestRunnable {
String key;
String value;
boolean expectNRE;
int time;
GetEntry(String key, String value, int time, boolean expectNRE) {
this.key = key;
this.value = value;
this.time = time;
this.expectNRE = expectNRE;
}
public void runTest() {
try {
// Get from the cache
Object fromCache = admin.getFromCache(key, time);
assertEquals(value, fromCache);
} catch (NeedsRefreshException nre) {
if (!expectNRE) {
admin.cancelUpdate(key);
fail("Thread should have blocked until a new cache entry was ready");
} else {
// Put a new piece of content into the cache
admin.putInCache(key, value);
}
}
}
}
private class GetEntrySimple extends GetEntry {
GetEntrySimple(String key, String value, int time, boolean expectNRE) {
super(key, value, time, expectNRE);
}
public void run() {
runTest();
}
}
private class PutInCache extends TestRunnable {
String key;
String value;
long wait;
PutInCache(String key, String value, long wait) {
this.key = key;
this.value = value;
this.wait = wait;
}
public void runTest() {
try {
Thread.sleep(wait);
} catch (InterruptedException ie) {
fail("PutInCache thread shouldn't be interrupted.");
}
admin.putInCache(key, value);
}
}
/**
* Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
*/
private class GetStaleEntryAndCancelUpdate extends TestRunnable {
String key;
int retries;
int time;
GetStaleEntryAndCancelUpdate(String key, int time, int retries) {
this.key = key;
this.time = time;
this.retries = retries;
}
public void runTest() {
for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
try {
// Get from the cache
Object fromCache = admin.getFromCache(key, time);
assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache);
} catch (NeedsRefreshException nre) {
try {
admin.cancelUpdate(key);
} catch (Throwable t) {
log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);
fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
}
} catch (Throwable t) {
log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);
fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
}
}
}
}
private class OSGeneralTest extends TestRunnable {
public void doit(int i) {
int refreshPeriod = 500 /*millis*/;
String key = KEY + (i % UNIQUE_KEYS);
admin.putInCache(key, VALUE);
try {
// Get from the cache
admin.getFromCache(KEY, refreshPeriod);
} catch (NeedsRefreshException nre) {
// Get the value
// Store in the cache
admin.putInCache(KEY, VALUE);
}
// Flush occasionally
if ((i % (UNIQUE_KEYS + 1)) == 0) {
admin.getCache().flushEntry(key);
}
}
public void runTest() {
int start = (int) (Math.random() * UNIQUE_KEYS);
System.out.print(start + " ");
for (int i = start; i < (start + ITERATION_COUNT); i++) {
doit(i);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -