⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractreplicatedmap.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    }
    
    public Member[] getMapMembersExcl(Member[] exclude) {
        synchronized (mapMembers) {
            HashMap list = (HashMap)mapMembers.clone();
            for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
            return getMapMembers(list);
        }
    }


    /**
     * Replicates any changes to the object since the last time
     * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
     * @param complete - if set to true, the object is replicated to its backup
     * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
     * be replicated
     */
    public void replicate(Object key, boolean complete) {
        if ( log.isTraceEnabled() )
            log.trace("Replicate invoked on key:"+key);
        MapEntry entry = (MapEntry)super.get(key);
        if ( entry == null ) return;
        if ( !entry.isSerializable() ) return;
        if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
            Object value = entry.getValue();
            //check to see if we need to replicate this object isDirty()||complete
            boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
            
            if (!repl) {
                if ( log.isTraceEnabled() )
                    log.trace("Not replicating:"+key+", no change made");
                
                return;
            }
            //check to see if the message is diffable
            boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
            MapMessage msg = null;
            if (diff) {
                ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
                try {
                    rentry.lock();
                    //construct a diff message
                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                         true, (Serializable) entry.getKey(), null,
                                         rentry.getDiff(),
                                         entry.getBackupNodes());
                } catch (IOException x) {
                    log.error("Unable to diff object. Will replicate the entire object instead.", x);
                } finally {
                    rentry.unlock();
                }
                
            }
            if (msg == null) {
                //construct a complete
                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                     false, (Serializable) entry.getKey(),
                                     (Serializable) entry.getValue(),
                                     null, entry.getBackupNodes());

            }
            try {
                if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
                    channel.send(entry.getBackupNodes(), msg, channelSendOptions);
                }
            } catch (ChannelException x) {
                log.error("Unable to replicate data.", x);
            }
        } //end if

    }

    /**
     * This can be invoked by a periodic thread to replicate out any changes.
     * For maps that don't store objects that implement ReplicatedMapEntry, this
     * method should be used infrequently to avoid large amounts of data transfer
     * @param complete boolean
     */
    public void replicate(boolean complete) {
        Iterator i = super.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry e = (Map.Entry) i.next();
            replicate(e.getKey(), complete);
        } //while

    }

    public void transferState() {
        try {
            Member[] members = getMapMembers();
            Member backup = members.length > 0 ? (Member) members[0] : null;
            if (backup != null) {
                MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
                                                null, null, null, null);
                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
                if (resp.length > 0) {
                    synchronized (stateMutex) {
                        msg = (MapMessage) resp[0].getMessage();
                        msg.deserialize(getExternalLoaders());
                        ArrayList list = (ArrayList) msg.getValue();
                        for (int i = 0; i < list.size(); i++) {
                            messageReceived( (Serializable) list.get(i), resp[0].getSource());
                        } //for
                    }
                } else {
                    log.warn("Transfer state, 0 replies, probably a timeout.");
                }
            }
        } catch (ChannelException x) {
            log.error("Unable to transfer LazyReplicatedMap state.", x);
        } catch (IOException x) {
            log.error("Unable to transfer LazyReplicatedMap state.", x);
        } catch (ClassNotFoundException x) {
            log.error("Unable to transfer LazyReplicatedMap state.", x);
        }
        stateTransferred = true;
    }

    /**
     * @todo implement state transfer
     * @param msg Serializable
     * @return Serializable - null if no reply should be sent
     */
    public Serializable replyRequest(Serializable msg, final Member sender) {
        if (! (msg instanceof MapMessage))return null;
        MapMessage mapmsg = (MapMessage) msg;

        //map init request
        if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
            return mapmsg;
        }
        
        //map start request
        if (mapmsg.getMsgType() == mapmsg.MSG_START) {
            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
            mapMemberAdded(sender);
            return mapmsg;
        }

        //backup request
        if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
            if (entry == null || (!entry.isSerializable()) )return null;
            mapmsg.setValue( (Serializable) entry.getValue());
            return mapmsg;
        }

        //state transfer request
        if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
            synchronized (stateMutex) { //make sure we dont do two things at the same time
                ArrayList list = new ArrayList();
                Iterator i = super.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry e = (Map.Entry) i.next();
                    MapEntry entry = (MapEntry) e.getValue();
                    if ( entry.isSerializable() ) {
                        MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
                            false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
                        list.add(me);
                    }
                }
                mapmsg.setValue(list);
                return mapmsg;
                
            } //synchronized
        }

        return null;

    }

    /**
     * If the reply has already been sent to the requesting thread,
     * the rpc callback can handle any data that comes in after the fact.
     * @param msg Serializable
     * @param sender Member
     */
    public void leftOver(Serializable msg, Member sender) {
        //left over membership messages
        if (! (msg instanceof MapMessage))return;

        MapMessage mapmsg = (MapMessage) msg;
        try {
            mapmsg.deserialize(getExternalLoaders());
            if (mapmsg.getMsgType() == MapMessage.MSG_START) {
                mapMemberAdded(mapmsg.getBackupNodes()[0]);
            } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
                memberAlive(mapmsg.getBackupNodes()[0]);
            }
        } catch (IOException x ) {
            log.error("Unable to deserialize MapMessage.",x);
        } catch (ClassNotFoundException x ) {
            log.error("Unable to deserialize MapMessage.",x);
        }
    }

    public void messageReceived(Serializable msg, Member sender) {
        if (! (msg instanceof MapMessage)) return;

        MapMessage mapmsg = (MapMessage) msg;
        if ( log.isTraceEnabled() ) {
            log.trace("Map["+mapname+"] received message:"+mapmsg);
        }
        
        try {
            mapmsg.deserialize(getExternalLoaders());
        } catch (IOException x) {
            log.error("Unable to deserialize MapMessage.", x);
            return;
        } catch (ClassNotFoundException x) {
            log.error("Unable to deserialize MapMessage.", x);
            return;
        }
        if ( log.isTraceEnabled() ) 
            log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
        if (mapmsg.getMsgType() == MapMessage.MSG_START) {
            mapMemberAdded(mapmsg.getBackupNodes()[0]);
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
            memberDisappeared(mapmsg.getBackupNodes()[0]);
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
            if ( entry==null ) {
                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
                entry.setBackup(false);
                entry.setProxy(true);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                super.put(entry.getKey(), entry);
            } else {
                entry.setProxy(true);
                entry.setBackup(false);
                entry.setBackupNodes(mapmsg.getBackupNodes());
            }
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
            super.remove(mapmsg.getKey());
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
            if (entry == null) {
                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
                entry.setBackup(true);
                entry.setProxy(false);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
                    ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                }
            } else {
                entry.setBackup(true);
                entry.setProxy(false);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                if (entry.getValue() instanceof ReplicatedMapEntry) {
                    ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
                    if (mapmsg.isDiff()) {
                        try {
                            diff.lock();
                            diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
                        } catch (Exception x) {
                            log.error("Unable to apply diff to key:" + entry.getKey(), x);
                        } finally {
                            diff.unlock();
                        }
                    } else {
                        if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
                        ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
                    } //end if
                } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                    ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
                    re.setOwner(getMapOwner());
                    entry.setValue(re);
                } else {
                    if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
                } //end if
            } //end if
            super.put(entry.getKey(), entry);
        } //end if
    }

    public boolean accept(Serializable msg, Member sender) {
        boolean result = false;
        if (msg instanceof MapMessage) {
            if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
            result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
            if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
        }
        return result;
    }

    public void mapMemberAdded(Member member) {
        if ( member.equals(getChannel().getLocalMember(false)) ) return;
        boolean memberAdded = false;
        //select a backup node if we don't have one
        synchronized (mapMembers) {
            if (!mapMembers.containsKey(member) ) {
                mapMembers.put(member, new Long(System.currentTimeMillis()));
                memberAdded = true;
            }
        }
        if ( memberAdded ) {
            synchronized (stateMutex) {
                Iterator i = super.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry e = (Map.Entry) i.next();
                    MapEntry entry = (MapEntry) e.getValue();
                    if ( entry == null ) continue;
                    if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
                        try {
                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                            entry.setBackupNodes(backup);
                        } catch (ChannelException x) {
                            log.error("Unable to select backup node.", x);
                        } //catch
                    } //end if
                } //while
            } //synchronized
        }//end if
    }
    
    public boolean inSet(Member m, Member[] set) {
        if ( set == null ) return false;
        boolean result = false;
        for (int i=0; i<set.length && (!result); i++ )
            if ( m.equals(set[i]) ) result = true;
        return result;
    }

    public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
        ArrayList result = new ArrayList();
        for (int i=0; i<set.length; i++ ) {
            boolean include = true;
            for (int j=0; j<mbrs.length; j++ ) 
                if ( mbrs[j].equals(set[i]) ) include = false;
            if ( include ) result.add(set[i]);
        }
        return (Member[])result.toArray(new Member[result.size()]);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -