📄 lazyreplicatedmap.java
字号:
/* * Copyright 1999,2004-2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.tipis;import java.io.Serializable;import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.Iterator;import java.util.LinkedHashSet;import java.util.Map;import java.util.Set;import org.apache.catalina.tribes.Channel;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelListener;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.MembershipListener;import org.apache.catalina.tribes.group.*;/** * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. * One node is always the primary and one node is always the backup. * This map is synchronized across a cluster, and only has one backup member.<br/> * A perfect usage for this map would be a session map for a session manager in a clustered environment.<br/> * The only way to modify this list is to use the <code>put, putAll, remove</code> methods. * entrySet, entrySetFull, keySet, keySetFull, returns all non modifiable sets.<br><br> * If objects (values) in the map change without invoking <code>put()</code> or <code>remove()</code> * the data can be distributed using two different methods:<br> * <code>replicate(boolean)</code> and <code>replicate(Object, boolean)</code><br> * These two methods are very important two understand. The map can work with two set of value objects:<br> * 1. Serializable - the entire object gets serialized each time it is replicated<br> * 2. ReplicatedMapEntry - this interface allows for a isDirty() flag and to replicate diffs if desired.<br> * Implementing the <code>ReplicatedMapEntry</code> interface allows you to decide what objects * get replicated and how much data gets replicated each time.<br> * If you implement a smart AOP mechanism to detect changes in underlying objects, you can replicate * only those changes by implementing the ReplicatedMapEntry interface, and return true when isDiffable() * is invoked.<br><br> * * This map implementation doesn't have a background thread running to replicate changes. * If you do have changes without invoking put/remove then you need to invoke one of the following methods: * <ul> * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li> * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li> * </ul> * the <code>boolean</code> value in the <code>replicate</code> method used to decide * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code> * will replicate all objects in this map that are using this node as primary. * * <br><br><b>REMBER TO CALL <code>breakdown()</code> or <code>finalize()</code> when you are done with the map to * avoid memory leaks.<br><br> * @todo implement periodic sync/transfer thread * @author Filip Hanik * @version 1.0 */public class LazyReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class); //------------------------------------------------------------------------------ // CONSTRUCTORS / DESTRUCTORS//------------------------------------------------------------------------------ /** * Creates a new map * @param channel The channel to use for communication * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, ClassLoader[] cls) { super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT,cls); } /** * Creates a new map * @param channel The channel to use for communication * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, ClassLoader[] cls) { super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); } /** * Creates a new map * @param channel The channel to use for communication * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT, cls); } //------------------------------------------------------------------------------ // METHODS TO OVERRIDE //------------------------------------------------------------------------------ /** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object * @param value Object * @return Member - the backup node * @throws ChannelException */ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { //select a backup node Member next = getNextBackupNode(); if ( next == null ) return null; Member[] backup = wrap(next); MapMessage msg = null; //publish the data out to all nodes Member[] proxies = excludeFromSet(backup,getMapMembers()); if ( proxies.length > 0 ) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null, null, backup); getChannel().send(proxies, msg, getChannelSendOptions()); } //publish the backup data to one node msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, (Serializable) key, (Serializable) value, null, backup); getChannel().send(backup, msg, getChannelSendOptions()); return backup; } public Object get(Object key) { MapEntry entry = (MapEntry)super.get(key); if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry); if ( entry == null ) return null; if ( !entry.isPrimary() ) { //if the message is not primary, we need to retrieve the latest value try { Member[] backup = null; MapMessage msg = null; if ( !entry.isBackup() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout()); if (resp == null || resp.length == 0) { //no responses log.warn("Unable to retrieve remote object for key:" + key); return null; } msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); if ( entry.getValue() instanceof ReplicatedMapEntry ) { ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); val.setOwner(getMapOwner()); } if ( msg.getValue()!=null ) entry.setValue(msg.getValue()); } if (entry.isBackup()) { //select a new backup node backup = publishEntryInfo(key, entry.getValue()); } else if ( entry.isProxy() ) { //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); getChannel().send(getMapMembersExcl(backup),msg,getChannelSendOptions()); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -