⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractreplicatedmap.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.catalina.tribes.tipis;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Heartbeat;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcCallback;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;

/**
 *
 * @author Filip Hanik
 * @version 1.0
 */
public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
    protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);

    /**
     * The default initial capacity - MUST be a power of two.
     */
    public static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * The load factor used when none specified in constructor.
     **/
    public static final float DEFAULT_LOAD_FACTOR = 0.75f;
    
    /**
     * Used to identify the map
     */
    final String chset = "ISO-8859-1";

//------------------------------------------------------------------------------
//              INSTANCE VARIABLES
//------------------------------------------------------------------------------
    
    /**
     * Timeout for RPC messages, how long we will wait for a reply
     */
    protected transient long rpcTimeout = 5000;
    /**
     * Reference to the channel for sending messages
     */
    protected transient Channel channel;
    /**
     * The RpcChannel to send RPC messages through
     */
    protected transient RpcChannel rpcChannel;
    /**
     * The Map context name makes this map unique, this
     * allows us to have more than one map shared
     * through one channel
     */
    protected transient byte[] mapContextName;
    /**
     * Has the state been transferred
     */
    protected transient boolean stateTransferred = false;
    /**
     * Simple lock object for transfers
     */
    protected transient Object stateMutex = new Object();
    /**
     * A list of members in our map
     */
    protected transient HashMap mapMembers = new HashMap();
    /**
     * Our default send options
     */
    protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
    /**
     * The owner of this map, ala a SessionManager for example
     */
    protected transient Object mapOwner;
    /**
     * External class loaders if serialization and deserialization is to be performed successfully.
     */
    protected transient ClassLoader[] externalLoaders;
    
    /**
     * The node we are currently backing up data to, this index will rotate
     * on a round robin basis
     */
    protected transient int currentNode = 0;
    
    /**
     * Since the map keeps internal membership
     * this is the timeout for a ping message to be responded to
     * If a remote map doesn't respond within this timeframe, 
     * its considered dead.
     */
    protected transient long accessTimeout = 5000;
    
    /**
     * Readable string of the mapContextName value
     */
    protected transient String mapname = "";
    

//------------------------------------------------------------------------------
//              CONSTRUCTORS
//------------------------------------------------------------------------------

    /**
     * Creates a new map
     * @param channel The channel to use for communication
     * @param timeout long - timeout for RPC messags
     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
     * @param initialCapacity int - the size of this map, see HashMap
     * @param loadFactor float - load factor, see HashMap
     * @param cls - a list of classloaders to be used for deserialization of objects.
     */
    public AbstractReplicatedMap(Object owner,
                                 Channel channel, 
                                 long timeout, 
                                 String mapContextName, 
                                 int initialCapacity,
                                 float loadFactor,
                                 int channelSendOptions,
                                 ClassLoader[] cls) {
        super(initialCapacity, loadFactor, 15);
        init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
        
    }

    /**
     * Helper methods, wraps a single member in an array
     * @param m Member
     * @return Member[]
     */
    protected Member[] wrap(Member m) {
        if ( m == null ) return new Member[0];
        else return new Member[] {m};
    }

    /**
     * Initializes the map by creating the RPC channel, registering itself as a channel listener
     * This method is also responsible for initiating the state transfer
     * @param owner Object
     * @param channel Channel
     * @param mapContextName String
     * @param timeout long
     * @param channelSendOptions int
     * @param cls ClassLoader[]
     */
    protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
        log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
        this.mapOwner = owner;
        this.externalLoaders = cls;
        this.channelSendOptions = channelSendOptions;
        this.channel = channel;
        this.rpcTimeout = timeout;

        try {
            this.mapname = mapContextName;
            //unique context is more efficient if it is stored as bytes
            this.mapContextName = mapContextName.getBytes(chset);
        } catch (UnsupportedEncodingException x) {
            log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
            this.mapContextName = mapContextName.getBytes();
        }
        if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));

        //create an rpc channel and add the map as a listener
        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
        //add this map as a message listener
        this.channel.addChannelListener(this);
        //listen for membership notifications
        this.channel.addMembershipListener(this);
        
        
        try {
            //broadcast our map, this just notifies other members of our existence
            broadcast(MapMessage.MSG_INIT, true);
            //transfer state from another map
            transferState();
            //state is transferred, we are ready for messaging
            broadcast(MapMessage.MSG_START, true);
        } catch (ChannelException x) {
            log.warn("Unable to send map start message.");
            throw new RuntimeException("Unable to start replicated map.",x);
        }
    }
    
    
    /**
     * Sends a ping out to all the members in the cluster, not just map members
     * that this map is alive.
     * @param timeout long
     * @throws ChannelException
     */
    protected void ping(long timeout) throws ChannelException {
        //send out a map membership message, only wait for the first reply
        MapMessage msg = new MapMessage(this.mapContextName, 
                                        MapMessage.MSG_INIT,
                                        false, 
                                        null, 
                                        null, 
                                        null, 
                                        wrap(channel.getLocalMember(false)));
        if ( channel.getMembers().length > 0 ) {
            //send a ping, wait for all nodes to reply
            Response[] resp = rpcChannel.send(channel.getMembers(), 
                                              msg, rpcChannel.ALL_REPLY, 
                                              (channelSendOptions),
                                              (int) accessTimeout);
            for (int i = 0; i < resp.length; i++) {
                memberAlive(resp[i].getSource());
            } //for
        }
        //update our map of members, expire some if we didn't receive a ping back
        synchronized (mapMembers) {
            Iterator it = mapMembers.entrySet().iterator();
            long now = System.currentTimeMillis();
            while ( it.hasNext() ) {
                Map.Entry entry = (Map.Entry)it.next();
                long access = ((Long)entry.getValue()).longValue(); 
                if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
            }
        }//synch
    }

    /**
     * We have received a member alive notification
     * @param member Member
     */
    protected void memberAlive(Member member) {
        synchronized (mapMembers) {
            if (!mapMembers.containsKey(member)) {
                mapMemberAdded(member);
            } //end if
            mapMembers.put(member, new Long(System.currentTimeMillis()));
        }
    }
    
    /**
     * Helper method to broadcast a message to all members in a channel
     * @param msgtype int
     * @param rpc boolean
     * @throws ChannelException
     */
    protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
        //send out a map membership message, only wait for the first reply
        MapMessage msg = new MapMessage(this.mapContextName, msgtype,
                                        false, null, null, null, wrap(channel.getLocalMember(false)));
        if ( rpc) {
            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
            for (int i = 0; i < resp.length; i++) {
                mapMemberAdded(resp[i].getSource());
                messageReceived(resp[i].getMessage(), resp[i].getSource());
            }
        } else {
            channel.send(channel.getMembers(),msg,channelSendOptions);
        }
    }

    public void breakdown() {
        finalize();
    }

    public void finalize() {
        try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
        //cleanup
        if (this.rpcChannel != null) {
            this.rpcChannel.breakdown();
        }
        if (this.channel != null) {
            this.channel.removeChannelListener(this);
            this.channel.removeMembershipListener(this);
        }
        this.rpcChannel = null;
        this.channel = null;
        this.mapMembers.clear();
        super.clear();
        this.stateTransferred = false;
        this.externalLoaders = null;
    }
    
    public int hashCode() {
        return Arrays.hashCode(this.mapContextName);
    }
    
    public boolean equals(Object o) {
        if ( o == null ) return false;
        if ( !(o instanceof AbstractReplicatedMap)) return false;
        if ( !(o.getClass().equals(this.getClass())) ) return false;
        AbstractReplicatedMap other = (AbstractReplicatedMap)o;
        return Arrays.equals(mapContextName,other.mapContextName);
    }

//------------------------------------------------------------------------------
//              GROUP COM INTERFACES
//------------------------------------------------------------------------------
    public Member[] getMapMembers(HashMap members) {
        synchronized (members) {
            Member[] result = new Member[members.size()];
            members.keySet().toArray(result);
            return result;
        }
    }
    public Member[] getMapMembers() {
        return getMapMembers(this.mapMembers);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -