replicatedhashtable.java

来自「JGRoups源码」· Java 代码 · 共 511 行 · 第 1/2 页

JAVA
511
字号
        return retval;    }    /*------------------------ Callbacks -----------------------*/    Object _put(Object key, Object value) {        Object retval=super.put(key, value);        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).entrySet(key, value);        return retval;    }    void _clear() {        super.clear();    }    Object _remove(Object key) {        Object retval=super.remove(key);        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).entryRemoved(key);        return retval;    }    /**     * @see java.util.Map#putAll(java.util.Map)     */    public void _putAll(Map m) {        if(m == null)            return;        //######## The same way as in the DistributedHashtable        // Calling the method below seems okay, but would result in ... deadlock !        // The reason is that Map.putAll() calls put(), which we override, which results in        // lock contention for the map.        // ---> super.putAll(m); <--- CULPRIT !!!@#$%$        // That said let's do it the stupid way:        //######## The same way as in the DistributedHashtable        Map.Entry entry;        for(Iterator it=m.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            super.put(entry.getKey(), entry.getValue());        }        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).contentsSet(m);    }    /*----------------------------------------------------------*/    /*-------------------- MessageListener ----------------------*/    public void receive(Message msg) {        Request req=null;        if(msg == null)            return;        req=(Request)msg.getObject();        if(req == null)            return;        switch(req.req_type) {            case Request.PUT:                if(req.key != null && req.val != null)                    _put(req.key, req.val);                break;            case Request.REMOVE:                if(req.key != null)                    _remove(req.key);                break;            case Request.CLEAR:                _clear();                break;            case Request.PUT_ALL:                if(req.val != null)                    _putAll((Map)req.val);                break;            default :                // error        }    }    public byte[] getState() {        Object key, val;        Hashtable copy=new Hashtable();        for(Enumeration e=keys(); e.hasMoreElements();) {            key=e.nextElement();            val=get(key);            copy.put(key, val);        }        try {            return Util.objectToByteBuffer(copy);        }        catch(Exception ex) {            if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);            return null;        }    }    public void setState(byte[] new_state) {        Hashtable new_copy;        Object key;        try {            new_copy=(Hashtable)Util.objectFromByteBuffer(new_state);            if(new_copy == null) {                notifyStateTransferCompleted(true);                return;            }        }        catch(Throwable ex) {            if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex);            notifyStateTransferCompleted(false);            return;        }        _clear(); // remove all elements        for(Enumeration e=new_copy.keys(); e.hasMoreElements();) {            key=e.nextElement();            _put(key, new_copy.get(key));        }        notifyStateTransferCompleted(true);    }    /*-------------------- End of MessageListener ----------------------*/    /*----------------------- MembershipListener ------------------------*/    public void viewAccepted(View new_view) {        Vector new_mbrs=new_view.getMembers();        if(new_mbrs != null) {            sendViewChangeNotifications(new_mbrs, members);            // notifies observers (joined, left)            members.removeAllElements();            for(int i=0; i < new_mbrs.size(); i++)                members.addElement(new_mbrs.elementAt(i));        }        //if size is bigger than one, there are more peers in the group        //otherwise there is only one server.        if(members.size() > 1) {            send_message=true;        }        else {            send_message=false;        }    }    /** Called when a member is suspected */    public void suspect(Address suspected_mbr) {        ;    }    /** Block sending and receiving of messages until ViewAccepted is called */    public void block() {    }    /*------------------- End of MembershipListener ----------------------*/    void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {        Vector joined, left;        Object mbr;        Notification n;        if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null || old_mbrs.size() == 0 || new_mbrs.size() == 0)            return;        // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs        joined=new Vector();        for(int i=0; i < new_mbrs.size(); i++) {            mbr=new_mbrs.elementAt(i);            if(!old_mbrs.contains(mbr))                joined.addElement(mbr);        }        // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs        left=new Vector();        for(int i=0; i < old_mbrs.size(); i++) {            mbr=old_mbrs.elementAt(i);            if(!new_mbrs.contains(mbr)) {                left.addElement(mbr);            }        }        for(int i=0; i < notifs.size(); i++) {            n=(Notification)notifs.elementAt(i);            n.viewChange(joined, left);        }    }    void notifyStateTransferStarted() {        state_transfer_running=true;        for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) {            StateTransferListener listener=(StateTransferListener)it.next();            try {                listener.stateTransferStarted();            }            catch(Throwable t) {            }        }    }    void notifyStateTransferCompleted(boolean success) {        state_transfer_running=false;        for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) {            StateTransferListener listener=(StateTransferListener)it.next();            try {                listener.stateTransferCompleted(success);            }            catch(Throwable t) {            }        }    }    private static class Request implements Serializable {        static final int PUT=1;        static final int REMOVE=2;        static final int CLEAR=3;        static final int PUT_ALL=4;        int req_type=0;        Object key=null;        Object val=null;        Request(int req_type, Object key, Object val) {            this.req_type=req_type;            this.key=key;            this.val=val;        }        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append(type2String(req_type));            if(key != null)                sb.append("\nkey=" + key);            if(val != null)                sb.append("\nval=" + val);            return sb.toString();        }        String type2String(int t) {            switch(t) {                case PUT:                    return "PUT";                case REMOVE:                    return "REMOVE";                case CLEAR:                    return "CLEAR";                case PUT_ALL:                    return "PUT_ALL";                default :                    return "<unknown>";            }        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?