⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractreplicatedmap.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright 1999,2004-2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *      http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.tipis;import java.io.Externalizable;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.io.Serializable;import java.io.UnsupportedEncodingException;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;import org.apache.catalina.tribes.Channel;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelListener;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.MembershipListener;import org.apache.catalina.tribes.io.XByteBuffer;import org.apache.catalina.tribes.membership.MemberImpl;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.catalina.tribes.group.*;/** * <p>Title: </p> * * <p>Description: </p> * * <p>Copyright: Copyright (c) 2005</p> * * <p>Company: </p> * * @author not attributable * @version 1.0 */public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener {    protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);    /**     * The default initial capacity - MUST be a power of two.     */    public static final int DEFAULT_INITIAL_CAPACITY = 16;    /**     * The load factor used when none specified in constructor.     **/    public static final float DEFAULT_LOAD_FACTOR = 0.75f;        /**     * Used to identify the map     */    final String chset = "ISO-8859-1";//------------------------------------------------------------------------------//              INSTANCE VARIABLES//------------------------------------------------------------------------------    private transient long rpcTimeout = 5000;    private transient Channel channel;    private transient RpcChannel rpcChannel;    private transient byte[] mapContextName;    private transient boolean stateTransferred = false;    private transient Object stateMutex = new Object();    private transient ArrayList mapMembers = new ArrayList();    private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;    private transient Object mapOwner;    private transient ClassLoader[] externalLoaders;    //------------------------------------------------------------------------------//              CONSTRUCTORS//------------------------------------------------------------------------------    /**     * Creates a new map     * @param channel The channel to use for communication     * @param timeout long - timeout for RPC messags     * @param mapContextName String - unique name for this map, to allow multiple maps per channel     * @param initialCapacity int - the size of this map, see HashMap     * @param loadFactor float - load factor, see HashMap     * @param cls - a list of classloaders to be used for deserialization of objects.     */    public AbstractReplicatedMap(Object owner,                                 Channel channel,                                  long timeout,                                  String mapContextName,                                  int initialCapacity,                                 float loadFactor,                                 int channelSendOptions,                                 ClassLoader[] cls) {        super(initialCapacity, loadFactor);        init(owner, channel, mapContextName, timeout, channelSendOptions, cls);            }    protected Member[] wrap(Member m) {        return new Member[] {m};    }    private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {        this.mapOwner = owner;        this.externalLoaders = cls;        this.channelSendOptions = channelSendOptions;        this.channel = channel;        this.rpcTimeout = timeout;        try {            //unique context is more efficient if it is stored as bytes            this.mapContextName = mapContextName.getBytes(chset);        } catch (UnsupportedEncodingException x) {            log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);            this.mapContextName = mapContextName.getBytes();        }        //create an rpc channel and add the map as a listener        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);        this.channel.addChannelListener(this);        this.channel.addMembershipListener(this);                        try {            broadcast(MapMessage.MSG_INIT, true);            //transfer state from another map            transferState();            broadcast(MapMessage.MSG_START, true);        } catch (ChannelException x) {            log.warn("Unable to send map start message.");            throw new RuntimeException("Unable to start replicated map.",x);        }    }            private void broadcast(int msgtype, boolean rpc) throws ChannelException {        //send out a map membership message, only wait for the first reply        MapMessage msg = new MapMessage(this.mapContextName, msgtype,                                        false, null, null, null, wrap(channel.getLocalMember(false)));        if ( rpc) {            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);            for (int i = 0; i < resp.length; i++) {                mapMemberAdded(resp[i].getSource());                messageReceived(resp[i].getMessage(), resp[i].getSource());            }        } else {            channel.send(channel.getMembers(),msg,channelSendOptions);        }    }    public void breakdown() {        finalize();    }    public void finalize() {        try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}        //cleanup        if (this.rpcChannel != null) {            this.rpcChannel.breakdown();        }        if (this.channel != null) {            this.channel.removeChannelListener(this);            this.channel.removeMembershipListener(this);        }        this.rpcChannel = null;        this.channel = null;        this.mapMembers.clear();        super.clear();        this.stateTransferred = false;        this.externalLoaders = null;    }//------------------------------------------------------------------------------//              GROUP COM INTERFACES//------------------------------------------------------------------------------    public Member[] getMapMembers() {        synchronized (mapMembers) {            Member[] result = new Member[mapMembers.size()];            mapMembers.toArray(result);            return result;        }    }        public Member[] getMapMembersExcl(Member[] exclude) {        synchronized (mapMembers) {            ArrayList list = (ArrayList)mapMembers.clone();            for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);            Member[] result = new Member[list.size()];            list.toArray(result);            return result;        }    }    /**     * Replicates any changes to the object since the last time     * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>     * @param complete - if set to true, the object is replicated to its backup     * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will     * be replicated     */    public void replicate(Object key, boolean complete) {        MapEntry entry = (MapEntry)super.get(key);        if (entry != null && entry.isPrimary()) {            Object value = entry.getValue();            //check to see if we need to replicate this object isDirty()||complete            boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());            if (!repl)return;            //check to see if the message is diffable            boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());            MapMessage msg = null;            if (diff) {                ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();                try {                    rentry.lock();                    //construct a diff message                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,                                         true, (Serializable) entry.getKey(), null,                                         rentry.getDiff(),                                         entry.getBackupNodes());                } catch (IOException x) {                    log.error("Unable to diff object. Will replicate the entire object instead.", x);                } finally {                    rentry.unlock();                }                            }            if (msg == null) {                //construct a complete                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,                                     false, (Serializable) entry.getKey(),                                     (Serializable) entry.getValue(),                                     null, entry.getBackupNodes());            }            try {                if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {                    channel.send(entry.getBackupNodes(), msg, channelSendOptions);                }            } catch (ChannelException x) {                log.error("Unable to replicate data.", x);            }        } //end if    }    /**     * This can be invoked by a periodic thread to replicate out any changes.     * For maps that don't store objects that implement ReplicatedMapEntry, this     * method should be used infrequently to avoid large amounts of data transfer     * @param complete boolean     */    public void replicate(boolean complete) {        Iterator i = super.entrySet().iterator();        while (i.hasNext()) {            Map.Entry e = (Map.Entry) i.next();            replicate(e.getKey(), complete);        } //while    }    public void transferState() {        try {            Member[] members = getMapMembers();            Member backup = members.length > 0 ? (Member) members[0] : null;            if (backup != null) {                MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,                                                null, null, null, null);                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);                if (resp.length > 0) {                    synchronized (stateMutex) {                        msg = (MapMessage) resp[0].getMessage();                        msg.deserialize(getExternalLoaders());                        ArrayList list = (ArrayList) msg.getValue();                        for (int i = 0; i < list.size(); i++) {                            messageReceived( (Serializable) list.get(i), resp[0].getSource());                        } //for                    }                } else {                    log.warn("Transfer state, 0 replies, probably a timeout.");                }            }        } catch (ChannelException x) {            log.error("Unable to transfer LazyReplicatedMap state.", x);        } catch (IOException x) {            log.error("Unable to transfer LazyReplicatedMap state.", x);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -