distributedhashtable.java

来自「JGRoups源码」· Java 代码 · 共 646 行 · 第 1/2 页

JAVA
646
字号
    }	/**	 * Clears this hashtable so that it contains no keys	 */	public synchronized void clear() {		//Changes done by <aos>		//if true, propagate action to the group        if(send_message == true) {            try {                disp.callRemoteMethods(                        null, "_clear", null,                        clear_signature,                        GroupRequest.GET_ALL,                        0);            }            catch(Exception e) {                if(log.isErrorEnabled()) log.error("exception=" + e);            }        }        else {            _clear();        }    }	/**	 * Removes the key (and its corresponding value) from the Hashtable.	 * @param key - the key to be removed.	 * @return the value to which the key had been mapped in this hashtable, or null if the key did not have a mapping.	 */	public Object remove(Object key) {		Object retval = get(key);		//Changes done by <aos>        //if true, propagate action to the group        if(send_message == true) {            try {                disp.callRemoteMethods(                        null, "_remove", new Object[]{key},                        remove_signature,                        GroupRequest.GET_ALL,                        0);                //return retval;            }            catch(Exception e) {                //return null;            }        }        else {            _remove(key);            //don't have to do retval = super.remove(..) as is done at the beginning        }        return retval;    }    /*------------------------ Callbacks -----------------------*/    public Object _put(Object key, Object value) {        Object retval=super.put(key, value);        if(persistent) {            try {                persistence_mgr.save((Serializable)key, (Serializable)value);            }            catch(CannotPersistException cannot_persist_ex) {                if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " +                            value + ", exception=" + cannot_persist_ex);            }            catch(Throwable t) {                if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " +                            value + ", exception=" + Util.printStackTrace(t));            }        }        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).entrySet(key, value);        return retval;    }    /**     * @see java.util.Map#putAll(java.util.Map)     */    public void _putAll(Map m) {        if (m == null)            return;        // Calling the method below seems okay, but would result in ... deadlock !        // The reason is that Map.putAll() calls put(), which we override, which results in        // lock contention for the map.        // ---> super.putAll(m); <--- CULPRIT !!!@#$%$        // That said let's do it the stupid way:        Map.Entry entry;        for(Iterator it=m.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            super.put(entry.getKey(), entry.getValue());        }        if (persistent) {            try {                persistence_mgr.saveAll(m);            }            catch (CannotPersistException persist_ex) {                if(log.isErrorEnabled()) log.error("failed persisting contents: " + persist_ex);            }            catch (Throwable t) {                if(log.isErrorEnabled()) log.error("failed persisting contents: " + t);            }        }        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).contentsSet(m);    }    public void _clear() {        super.clear();        if(persistent) {            try {                persistence_mgr.clear();            }            catch(CannotRemoveException cannot_remove_ex) {                if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex);            }            catch(Throwable t) {                if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t);            }        }        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).contentsCleared();    }    public Object _remove(Object key) {        Object retval=super.remove(key);        if(persistent) {            try {                persistence_mgr.remove((Serializable)key);            }            catch(CannotRemoveException cannot_remove_ex) {                if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex);            }            catch(Throwable t) {                if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t);            }        }        for(int i=0; i < notifs.size(); i++)            ((Notification)notifs.elementAt(i)).entryRemoved(key);        return retval;    }    /*----------------------------------------------------------*/    /*-------------------- State Exchange ----------------------*/    public void receive(Message msg) { }    public byte[] getState() {        Object    key, val;        Hashtable copy=new Hashtable();        for(Enumeration e=keys(); e.hasMoreElements();) {            key=e.nextElement();            val=get(key);            copy.put(key, val);        }        try {            return Util.objectToByteBuffer(copy);        }        catch(Throwable ex) {            if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);            return null;        }    }    public void setState(byte[] new_state) {        Hashtable new_copy;        try {            new_copy=(Hashtable)Util.objectFromByteBuffer(new_state);            if(new_copy == null)                return;        }        catch(Throwable ex) {            if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex);            return;        }        _putAll(new_copy);        state_promise.setResult(Boolean.TRUE);    }    /*------------------- Membership Changes ----------------------*/    public void viewAccepted(View new_view) {        Vector new_mbrs=new_view.getMembers();        if(new_mbrs != null) {            sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)            members.removeAllElements();            for(int i=0; i < new_mbrs.size(); i++)                members.addElement(new_mbrs.elementAt(i));        }        //if size is bigger than one, there are more peers in the group        //otherwise there is only one server.        send_message=members.size() > 1;    }    /** Called when a member is suspected */    public void suspect(Address suspected_mbr) {        ;    }    /** Block sending and receiving of messages until ViewAccepted is called */    public void block() {}    void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {        Vector        joined, left;        Object        mbr;        Notification  n;        if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null ||           old_mbrs.size() == 0 || new_mbrs.size() == 0)            return;        // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs        joined=new Vector();        for(int i=0; i < new_mbrs.size(); i++) {            mbr=new_mbrs.elementAt(i);            if(!old_mbrs.contains(mbr))                joined.addElement(mbr);        }        // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs        left=new Vector();        for(int i=0; i < old_mbrs.size(); i++) {            mbr=old_mbrs.elementAt(i);            if(!new_mbrs.contains(mbr)) {                left.addElement(mbr);            }        }        for(int i=0; i < notifs.size(); i++) {            n=(Notification)notifs.elementAt(i);            n.viewChange(joined, left);        }    }    final void initSignatures() {        try {            if(put_signature == null) {                put_signature=new Class[] {Object.class,Object.class};            }            if(putAll_signature == null) {                putAll_signature=new Class[] {Map.class};            }            if(clear_signature == null)                clear_signature=new Class[0];            if(remove_signature == null) {                remove_signature=new Class[] {Object.class};            }        }        catch(Throwable ex) {            if(log.isErrorEnabled()) log.error("exception=" + ex);        }    }    public static void main(String[] args) {        try {            // The setup here is kind of weird:            // 1. Create a channel            // 2. Create a DistributedHashtable (on the channel)            // 3. Connect the channel (so the HT gets a VIEW_CHANGE)            // 4. Start the HT            //            // A simpler setup is            // DistributedHashtable ht = new DistributedHashtable("demo", null,            //         "file://c:/JGroups-2.0/conf/state_transfer.xml", 5000);            JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/state_transfer.xml");            DistributedHashtable ht = new DistributedHashtable(c, false, 5000);            c.connect("demo");            ht.start(5000);            ht.put("name", "Michelle Ban");            Object old_key = ht.remove("name");            System.out.println("old key was " + old_key);            ht.put("newkey", "newvalue");            Map m = new HashMap();            m.put("k1", "v1");            m.put("k2", "v2");            ht.putAll(m);            System.out.println("hashmap is " + ht);        }        catch (Throwable t) {            t.printStackTrace();        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?