📄 abstractreplicatedmap.java
字号:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.tipis;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Heartbeat;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcCallback;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author Filip Hanik
* @version 1.0
*/
public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
/**
* The default initial capacity - MUST be a power of two.
*/
public static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* The load factor used when none specified in constructor.
**/
public static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* Used to identify the map
*/
final String chset = "ISO-8859-1";
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
/**
* Timeout for RPC messages, how long we will wait for a reply
*/
protected transient long rpcTimeout = 5000;
/**
* Reference to the channel for sending messages
*/
protected transient Channel channel;
/**
* The RpcChannel to send RPC messages through
*/
protected transient RpcChannel rpcChannel;
/**
* The Map context name makes this map unique, this
* allows us to have more than one map shared
* through one channel
*/
protected transient byte[] mapContextName;
/**
* Has the state been transferred
*/
protected transient boolean stateTransferred = false;
/**
* Simple lock object for transfers
*/
protected transient Object stateMutex = new Object();
/**
* A list of members in our map
*/
protected transient HashMap mapMembers = new HashMap();
/**
* Our default send options
*/
protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
/**
* The owner of this map, ala a SessionManager for example
*/
protected transient Object mapOwner;
/**
* External class loaders if serialization and deserialization is to be performed successfully.
*/
protected transient ClassLoader[] externalLoaders;
/**
* The node we are currently backing up data to, this index will rotate
* on a round robin basis
*/
protected transient int currentNode = 0;
/**
* Since the map keeps internal membership
* this is the timeout for a ping message to be responded to
* If a remote map doesn't respond within this timeframe,
* its considered dead.
*/
protected transient long accessTimeout = 5000;
/**
* Readable string of the mapContextName value
*/
protected transient String mapname = "";
//------------------------------------------------------------------------------
// CONSTRUCTORS
//------------------------------------------------------------------------------
/**
* Creates a new map
* @param channel The channel to use for communication
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
* @param cls - a list of classloaders to be used for deserialization of objects.
*/
public AbstractReplicatedMap(Object owner,
Channel channel,
long timeout,
String mapContextName,
int initialCapacity,
float loadFactor,
int channelSendOptions,
ClassLoader[] cls) {
super(initialCapacity, loadFactor, 15);
init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
}
/**
* Helper methods, wraps a single member in an array
* @param m Member
* @return Member[]
*/
protected Member[] wrap(Member m) {
if ( m == null ) return new Member[0];
else return new Member[] {m};
}
/**
* Initializes the map by creating the RPC channel, registering itself as a channel listener
* This method is also responsible for initiating the state transfer
* @param owner Object
* @param channel Channel
* @param mapContextName String
* @param timeout long
* @param channelSendOptions int
* @param cls ClassLoader[]
*/
protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
this.mapOwner = owner;
this.externalLoaders = cls;
this.channelSendOptions = channelSendOptions;
this.channel = channel;
this.rpcTimeout = timeout;
try {
this.mapname = mapContextName;
//unique context is more efficient if it is stored as bytes
this.mapContextName = mapContextName.getBytes(chset);
} catch (UnsupportedEncodingException x) {
log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
this.mapContextName = mapContextName.getBytes();
}
if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
//create an rpc channel and add the map as a listener
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
//add this map as a message listener
this.channel.addChannelListener(this);
//listen for membership notifications
this.channel.addMembershipListener(this);
try {
//broadcast our map, this just notifies other members of our existence
broadcast(MapMessage.MSG_INIT, true);
//transfer state from another map
transferState();
//state is transferred, we are ready for messaging
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
throw new RuntimeException("Unable to start replicated map.",x);
}
}
/**
* Sends a ping out to all the members in the cluster, not just map members
* that this map is alive.
* @param timeout long
* @throws ChannelException
*/
protected void ping(long timeout) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_INIT,
false,
null,
null,
null,
wrap(channel.getLocalMember(false)));
if ( channel.getMembers().length > 0 ) {
//send a ping, wait for all nodes to reply
Response[] resp = rpcChannel.send(channel.getMembers(),
msg, rpcChannel.ALL_REPLY,
(channelSendOptions),
(int) accessTimeout);
for (int i = 0; i < resp.length; i++) {
memberAlive(resp[i].getSource());
} //for
}
//update our map of members, expire some if we didn't receive a ping back
synchronized (mapMembers) {
Iterator it = mapMembers.entrySet().iterator();
long now = System.currentTimeMillis();
while ( it.hasNext() ) {
Map.Entry entry = (Map.Entry)it.next();
long access = ((Long)entry.getValue()).longValue();
if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
}
}//synch
}
/**
* We have received a member alive notification
* @param member Member
*/
protected void memberAlive(Member member) {
synchronized (mapMembers) {
if (!mapMembers.containsKey(member)) {
mapMemberAdded(member);
} //end if
mapMembers.put(member, new Long(System.currentTimeMillis()));
}
}
/**
* Helper method to broadcast a message to all members in a channel
* @param msgtype int
* @param rpc boolean
* @throws ChannelException
*/
protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null, wrap(channel.getLocalMember(false)));
if ( rpc) {
Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
for (int i = 0; i < resp.length; i++) {
mapMemberAdded(resp[i].getSource());
messageReceived(resp[i].getMessage(), resp[i].getSource());
}
} else {
channel.send(channel.getMembers(),msg,channelSendOptions);
}
}
public void breakdown() {
finalize();
}
public void finalize() {
try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
//cleanup
if (this.rpcChannel != null) {
this.rpcChannel.breakdown();
}
if (this.channel != null) {
this.channel.removeChannelListener(this);
this.channel.removeMembershipListener(this);
}
this.rpcChannel = null;
this.channel = null;
this.mapMembers.clear();
super.clear();
this.stateTransferred = false;
this.externalLoaders = null;
}
public int hashCode() {
return Arrays.hashCode(this.mapContextName);
}
public boolean equals(Object o) {
if ( o == null ) return false;
if ( !(o instanceof AbstractReplicatedMap)) return false;
if ( !(o.getClass().equals(this.getClass())) ) return false;
AbstractReplicatedMap other = (AbstractReplicatedMap)o;
return Arrays.equals(mapContextName,other.mapContextName);
}
//------------------------------------------------------------------------------
// GROUP COM INTERFACES
//------------------------------------------------------------------------------
public Member[] getMapMembers(HashMap members) {
synchronized (members) {
Member[] result = new Member[members.size()];
members.keySet().toArray(result);
return result;
}
}
public Member[] getMapMembers() {
return getMapMembers(this.mapMembers);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -