📄 cachefactory.java
字号:
* Returns the maximum number of cluster members allowed. A value of 0 or 1 will
* be returned when clustering is not allowed.
*
* @return the maximum number of cluster members allowed or 0 or 1 if clustering is not allowed.
*/
public static int getMaxClusterNodes() {
try {
CacheFactoryStrategy cacheFactory = (CacheFactoryStrategy) Class.forName(
clusteredCacheFactoryClass, true,
getClusteredCacheStrategyClassLoader()).newInstance();
return cacheFactory.getMaxClusterNodes();
} catch (ClassNotFoundException e) {
// Do nothing
} catch (Exception e) {
Log.error("Error instantiating clustered cache factory", e);
}
return 0;
}
/**
* Invokes a task on other cluster members in an asynchronous fashion. The task will not be
* executed on the local cluster member. If clustering is not enabled, this method
* will do nothing.
*
* @param task the task to be invoked on all other cluster members.
*/
public static void doClusterTask(final ClusterTask task) {
cacheFactoryStrategy.doClusterTask(task);
}
/**
* Invokes a task on a given cluster member in an asynchronous fashion. If clustering is not enabled,
* this method will do nothing.
*
* @param task the task to be invoked on the specified cluster member.
* @param nodeID the byte array that identifies the target cluster member.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/
public static void doClusterTask(final ClusterTask task, byte[] nodeID) {
cacheFactoryStrategy.doClusterTask(task, nodeID);
}
/**
* Invokes a task on other cluster members synchronously and returns the result as a Collection
* (method will not return until the task has been executed on each cluster member).
* The task will not be executed on the local cluster member. If clustering is not enabled,
* this method will return an empty collection.
*
* @param task the ClusterTask object to be invoked on all other cluster members.
* @param includeLocalMember true to run the task on the local member, false otherwise
* @return collection with the result of the execution.
*/
public static Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) {
return cacheFactoryStrategy.doSynchronousClusterTask(task, includeLocalMember);
}
/**
* Invokes a task on a given cluster member synchronously and returns the result of
* the remote operation. If clustering is not enabled, this method will return null.
*
* @param task the ClusterTask object to be invoked on a given cluster member.
* @param nodeID the byte array that identifies the target cluster member.
* @return result of remote operation or null if operation failed or operation returned null.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/
public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID);
}
public static synchronized void initialize() throws InitializationException {
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class
.forName(localCacheFactoryClass).newInstance();
}
catch (InstantiationException e) {
throw new InitializationException(e);
}
catch (IllegalAccessException e) {
throw new InitializationException(e);
}
catch (ClassNotFoundException e) {
throw new InitializationException(e);
}
}
private static ClassLoader getClusteredCacheStrategyClassLoader() {
PluginManager pluginManager = XMPPServer.getInstance().getPluginManager();
Plugin plugin = pluginManager.getPlugin("clustering");
if (plugin == null) {
plugin = pluginManager.getPlugin("enterprise");
}
PluginClassLoader pluginLoader = pluginManager.getPluginClassloader(plugin);
if (pluginLoader != null) {
return pluginLoader;
}
else {
Log.debug("CacheFactory - Unable to find a Plugin that provides clustering support.");
return Thread.currentThread().getContextClassLoader();
}
}
public static void startClustering() {
clusteringStarted = false;
clusteringStarting = true;
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(clusteredCacheFactoryClass, true,
getClusteredCacheStrategyClassLoader())
.newInstance();
clusteringStarted = cacheFactoryStrategy.startCluster();
}
catch (Exception e) {
Log.error("Unable to start clustering - continuing in local mode", e);
}
if (!clusteringStarted) {
// Revert to local cache factory if cluster fails to start
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
} catch (Exception e) {
Log.error("Fatal error - Failed to join the cluster and failed to use local cache", e);
}
}
else {
if (statsThread == null) {
// Start a timing thread with 1 second of accuracy.
statsThread = new Thread("Cache Stats") {
private volatile boolean destroyed = false;
public void run() {
XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
public void serverStarted() {}
public void serverStopping() {
destroyed = true;
}
});
ClusterManager.addListener(new ClusterEventListener() {
public void joinedCluster() {}
public void joinedCluster(byte[] nodeID) {}
public void leftCluster() {
destroyed = true;
ClusterManager.removeListener(this);
}
public void leftCluster(byte[] nodeID) {}
public void markedAsSeniorClusterMember() {}
});
// Run the timer indefinitely.
while (!destroyed && ClusterManager.isClusteringEnabled()) {
// Publish cache stats for this cluster node (assuming clustering is
// enabled and there are stats to publish).
try {
cacheFactoryStrategy.updateCacheStats(caches);
}
catch (Exception e) {
Log.error(e);
}
try {
// Sleep 10 seconds.
sleep(10000);
}
catch (InterruptedException ie) {
// Ignore.
}
}
statsThread = null;
Log.debug("Cache stats thread terminated.");
}
};
statsThread.setDaemon(true);
statsThread.start();
}
}
clusteringStarting = false;
}
public static void stopClustering() {
try {
// Stop the cluster
cacheFactoryStrategy.stopCluster();
// Set the strategy to local
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass)
.newInstance();
clusteringStarted = false;
}
catch (Exception e) {
Log.error("Unable to stop clustering - continuing in clustered mode", e);
}
}
/**
* Notification message indicating that this JVM has joined a cluster.
*/
public static void joinedCluster() {
// Loop through local caches and switch them to clustered cache (migrate content)
for (Cache cache : getAllCaches()) {
CacheWrapper cacheWrapper = ((CacheWrapper) cache);
Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
cacheWrapper.setWrappedCache(clusteredCache);
}
}
/**
* Notification message indicating that this JVM has left the cluster.
*/
public static void leftCluster() {
// Loop through clustered caches and change them to local caches (migrate content)
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
for (Cache cache : getAllCaches()) {
CacheWrapper cacheWrapper = ((CacheWrapper) cache);
Cache standaloneCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
cacheWrapper.setWrappedCache(standaloneCache);
}
} catch (Exception e) {
Log.error("Error reverting caches to local caches", e);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -