⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testconcurrency2.java

📁 oscache-2.4.1-full
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        admin.destroy();

        Properties p = new Properties();
        p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
        admin = new GeneralCacheAdministrator(p);

        assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());

        // Use a unique key in case these test entries are being persisted
        String key = "blocking";
        String NEW_VALUE = VALUE + " abc";
        admin.putInCache(key, VALUE);

        try {
            // Force a NeedsRefreshException
            admin.getFromCache(key, 0);
            fail("NeedsRefreshException should have been thrown");
        } catch (NeedsRefreshException nre) {
            // Fire off another thread to get the same cache entry.
            // Since blocking mode is enabled this thread should block
            // until the entry has been updated.
            GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);
            Thread thread = new Thread(getEntry);
            thread.start();

            // Sleep for a bit to simulate the time taken to build the cache entry
            try {
                Thread.sleep(20);
            } catch (InterruptedException ie) {
            }

            // Putting the entry in the cache should mean that threads now retrieve
            // the updated entry
            admin.putInCache(key, NEW_VALUE);

            getEntry = new GetEntry(key, NEW_VALUE, -1, false);
            thread = new Thread(getEntry);
            thread.start();

            try {
                Object fromCache = admin.getFromCache(key, -1);
                assertEquals(NEW_VALUE, fromCache);
            } catch (NeedsRefreshException e) {
                admin.cancelUpdate(key);
                fail("Should not have received a NeedsRefreshException");
            }
        }
    }

    private static final int RETRY_BY_THREADS = 100000;
    private static final int NB_THREADS = 4;

    /**
     * Checks whether the cache handles simultaneous attempts to access a
     * stable cache entry correctly when the blocking mode is enabled.
     *
     * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
     * The test is sucessfull if after some time, all threads are properly released
     */
    public void testConcurrentStaleGets() {
        GeneralCacheAdministrator staticAdmin = admin;
        //admin = new GeneralCacheAdministrator(); //avoid poluting other test cases

        try {
            // A test for the case where oscache.blocking = true
            //admin.destroy();
            Properties p = new Properties();
            p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
            admin = new GeneralCacheAdministrator(p);

            assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());

            String key = "new";

            //First put a value
            admin.putInCache(key, VALUE);

            try {
                //Then test without concurrency that it is reported as stale when time-to-live is zero 
                admin.getFromCache(key, 0);
                fail("NeedsRefreshException should have been thrown");
            } catch (NeedsRefreshException nre) {
                //Ok this is was is excpected, we can release the update
                admin.cancelUpdate(key);
            }

            //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
            TestRunnable[] spawnedThreads = new TestRunnable[NB_THREADS];

            for (int threadIndex = 0; threadIndex < NB_THREADS; threadIndex++) {
                spawnedThreads[threadIndex] = new GetStaleEntryAndCancelUpdate(key, 0, RETRY_BY_THREADS);
            }
            MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(spawnedThreads);
            
            //kickstarts the MTTR & fires off threads
            try {
                mttr.runTestRunnables(120 * 1000);
            } catch (Throwable e) {
                fail("at least one thread did not complete");
                e.printStackTrace();
            }
            
        } finally {
            // avoid poluting other test cases
            admin = staticAdmin;
        }
    }

    private class GetEntry extends TestRunnable {
        String key;
        String value;
        boolean expectNRE;
        int time;

        GetEntry(String key, String value, int time, boolean expectNRE) {
            this.key = key;
            this.value = value;
            this.time = time;
            this.expectNRE = expectNRE;
        }

        public void runTest() {
            try {
                // Get from the cache
                Object fromCache = admin.getFromCache(key, time);
                assertEquals(value, fromCache);
            } catch (NeedsRefreshException nre) {
                if (!expectNRE) {
                    admin.cancelUpdate(key);
                    fail("Thread should have blocked until a new cache entry was ready");
                } else {
                    // Put a new piece of content into the cache
                    admin.putInCache(key, value);
                }
            }
        }
    }
    
    private class GetEntrySimple extends GetEntry {
        GetEntrySimple(String key, String value, int time, boolean expectNRE) {
            super(key, value, time, expectNRE);
        }
        
        public void run() {
            runTest();
        }

    }

    private class PutInCache extends TestRunnable {

        String key;
        String value;
        long wait;

        PutInCache(String key, String value, long wait) {
            this.key = key;
            this.value = value;
            this.wait = wait;
        }

        public void runTest() {
            try {
                Thread.sleep(wait);
            } catch (InterruptedException ie) {
                fail("PutInCache thread shouldn't be interrupted.");
            }
            admin.putInCache(key, value);
        }
    }

    /**
      * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
      */
    private class GetStaleEntryAndCancelUpdate extends TestRunnable {
        String key;
        int retries;
        int time;

        GetStaleEntryAndCancelUpdate(String key, int time, int retries) {
            this.key = key;
            this.time = time;
            this.retries = retries;
        }

        public void runTest() {
            for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
                try {
                    // Get from the cache
                    Object fromCache = admin.getFromCache(key, time);
                    assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache);
                } catch (NeedsRefreshException nre) {
                    try {
                        admin.cancelUpdate(key);
                    } catch (Throwable t) {
                        log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);
                        fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
                    }
                } catch (Throwable t) {
                    log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);
                    fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
                }
            }
        }
    }

    private class OSGeneralTest extends TestRunnable {
        public void doit(int i) {
            int refreshPeriod = 500 /*millis*/;
            String key = KEY + (i % UNIQUE_KEYS);
            admin.putInCache(key, VALUE);

            try {
                // Get from the cache
                admin.getFromCache(KEY, refreshPeriod);
            } catch (NeedsRefreshException nre) {
                // Get the value
                // Store in the cache
                admin.putInCache(KEY, VALUE);
            }

            // Flush occasionally
            if ((i % (UNIQUE_KEYS + 1)) == 0) {
                admin.getCache().flushEntry(key);
            }
        }

        public void runTest() {
            int start = (int) (Math.random() * UNIQUE_KEYS);
            System.out.print(start + " ");

            for (int i = start; i < (start + ITERATION_COUNT); i++) {
                doit(i);
            }
        }
    }
    
    
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -