📄 abstractreplicatedmap.java
字号:
/* * Copyright 1999,2004-2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.tipis;import java.io.Externalizable;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.io.Serializable;import java.io.UnsupportedEncodingException;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;import org.apache.catalina.tribes.Channel;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelListener;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.MembershipListener;import org.apache.catalina.tribes.io.XByteBuffer;import org.apache.catalina.tribes.membership.MemberImpl;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.catalina.tribes.group.*;/** * <p>Title: </p> * * <p>Description: </p> * * <p>Copyright: Copyright (c) 2005</p> * * <p>Company: </p> * * @author not attributable * @version 1.0 */public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener { protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); /** * The default initial capacity - MUST be a power of two. */ public static final int DEFAULT_INITIAL_CAPACITY = 16; /** * The load factor used when none specified in constructor. **/ public static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * Used to identify the map */ final String chset = "ISO-8859-1";//------------------------------------------------------------------------------// INSTANCE VARIABLES//------------------------------------------------------------------------------ private transient long rpcTimeout = 5000; private transient Channel channel; private transient RpcChannel rpcChannel; private transient byte[] mapContextName; private transient boolean stateTransferred = false; private transient Object stateMutex = new Object(); private transient ArrayList mapMembers = new ArrayList(); private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; private transient Object mapOwner; private transient ClassLoader[] externalLoaders; //------------------------------------------------------------------------------// CONSTRUCTORS//------------------------------------------------------------------------------ /** * Creates a new map * @param channel The channel to use for communication * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap * @param cls - a list of classloaders to be used for deserialization of objects. */ public AbstractReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, int channelSendOptions, ClassLoader[] cls) { super(initialCapacity, loadFactor); init(owner, channel, mapContextName, timeout, channelSendOptions, cls); } protected Member[] wrap(Member m) { return new Member[] {m}; } private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { this.mapOwner = owner; this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; try { //unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(chset); } catch (UnsupportedEncodingException x) { log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x); this.mapContextName = mapContextName.getBytes(); } //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); this.channel.addChannelListener(this); this.channel.addMembershipListener(this); try { broadcast(MapMessage.MSG_INIT, true); //transfer state from another map transferState(); broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); throw new RuntimeException("Unable to start replicated map.",x); } } private void broadcast(int msgtype, boolean rpc) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, wrap(channel.getLocalMember(false))); if ( rpc) { Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout); for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { channel.send(channel.getMembers(),msg,channelSendOptions); } } public void breakdown() { finalize(); } public void finalize() { try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){} //cleanup if (this.rpcChannel != null) { this.rpcChannel.breakdown(); } if (this.channel != null) { this.channel.removeChannelListener(this); this.channel.removeMembershipListener(this); } this.rpcChannel = null; this.channel = null; this.mapMembers.clear(); super.clear(); this.stateTransferred = false; this.externalLoaders = null; }//------------------------------------------------------------------------------// GROUP COM INTERFACES//------------------------------------------------------------------------------ public Member[] getMapMembers() { synchronized (mapMembers) { Member[] result = new Member[mapMembers.size()]; mapMembers.toArray(result); return result; } } public Member[] getMapMembersExcl(Member[] exclude) { synchronized (mapMembers) { ArrayList list = (ArrayList)mapMembers.clone(); for (int i=0; i<exclude.length;i++) list.remove(exclude[i]); Member[] result = new Member[list.size()]; list.toArray(result); return result; } } /** * Replicates any changes to the object since the last time * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br> * @param complete - if set to true, the object is replicated to its backup * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will * be replicated */ public void replicate(Object key, boolean complete) { MapEntry entry = (MapEntry)super.get(key); if (entry != null && entry.isPrimary()) { Object value = entry.getValue(); //check to see if we need to replicate this object isDirty()||complete boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty()); if (!repl)return; //check to see if the message is diffable boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable()); MapMessage msg = null; if (diff) { ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue(); try { rentry.lock(); //construct a diff message msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable) entry.getKey(), null, rentry.getDiff(), entry.getBackupNodes()); } catch (IOException x) { log.error("Unable to diff object. Will replicate the entire object instead.", x); } finally { rentry.unlock(); } } if (msg == null) { //construct a complete msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), null, entry.getBackupNodes()); } try { if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) { channel.send(entry.getBackupNodes(), msg, channelSendOptions); } } catch (ChannelException x) { log.error("Unable to replicate data.", x); } } //end if } /** * This can be invoked by a periodic thread to replicate out any changes. * For maps that don't store objects that implement ReplicatedMapEntry, this * method should be used infrequently to avoid large amounts of data transfer * @param complete boolean */ public void replicate(boolean complete) { Iterator i = super.entrySet().iterator(); while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); replicate(e.getKey(), complete); } //while } public void transferState() { try { Member[] members = getMapMembers(); Member backup = members.length > 0 ? (Member) members[0] : null; if (backup != null) { MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false, null, null, null, null); Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); if (resp.length > 0) { synchronized (stateMutex) { msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); ArrayList list = (ArrayList) msg.getValue(); for (int i = 0; i < list.size(); i++) { messageReceived( (Serializable) list.get(i), resp[0].getSource()); } //for } } else { log.warn("Transfer state, 0 replies, probably a timeout."); } } } catch (ChannelException x) { log.error("Unable to transfer LazyReplicatedMap state.", x); } catch (IOException x) { log.error("Unable to transfer LazyReplicatedMap state.", x);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -