distributedhashtable.java
来自「JGRoups源码」· Java 代码 · 共 646 行 · 第 1/2 页
JAVA
646 行
} /** * Clears this hashtable so that it contains no keys */ public synchronized void clear() { //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { disp.callRemoteMethods( null, "_clear", null, clear_signature, GroupRequest.GET_ALL, 0); } catch(Exception e) { if(log.isErrorEnabled()) log.error("exception=" + e); } } else { _clear(); } } /** * Removes the key (and its corresponding value) from the Hashtable. * @param key - the key to be removed. * @return the value to which the key had been mapped in this hashtable, or null if the key did not have a mapping. */ public Object remove(Object key) { Object retval = get(key); //Changes done by <aos> //if true, propagate action to the group if(send_message == true) { try { disp.callRemoteMethods( null, "_remove", new Object[]{key}, remove_signature, GroupRequest.GET_ALL, 0); //return retval; } catch(Exception e) { //return null; } } else { _remove(key); //don't have to do retval = super.remove(..) as is done at the beginning } return retval; } /*------------------------ Callbacks -----------------------*/ public Object _put(Object key, Object value) { Object retval=super.put(key, value); if(persistent) { try { persistence_mgr.save((Serializable)key, (Serializable)value); } catch(CannotPersistException cannot_persist_ex) { if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " + value + ", exception=" + cannot_persist_ex); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " + value + ", exception=" + Util.printStackTrace(t)); } } for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).entrySet(key, value); return retval; } /** * @see java.util.Map#putAll(java.util.Map) */ public void _putAll(Map m) { if (m == null) return; // Calling the method below seems okay, but would result in ... deadlock ! // The reason is that Map.putAll() calls put(), which we override, which results in // lock contention for the map. // ---> super.putAll(m); <--- CULPRIT !!!@#$%$ // That said let's do it the stupid way: Map.Entry entry; for(Iterator it=m.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); super.put(entry.getKey(), entry.getValue()); } if (persistent) { try { persistence_mgr.saveAll(m); } catch (CannotPersistException persist_ex) { if(log.isErrorEnabled()) log.error("failed persisting contents: " + persist_ex); } catch (Throwable t) { if(log.isErrorEnabled()) log.error("failed persisting contents: " + t); } } for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).contentsSet(m); } public void _clear() { super.clear(); if(persistent) { try { persistence_mgr.clear(); } catch(CannotRemoveException cannot_remove_ex) { if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t); } } for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).contentsCleared(); } public Object _remove(Object key) { Object retval=super.remove(key); if(persistent) { try { persistence_mgr.remove((Serializable)key); } catch(CannotRemoveException cannot_remove_ex) { if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t); } } for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).entryRemoved(key); return retval; } /*----------------------------------------------------------*/ /*-------------------- State Exchange ----------------------*/ public void receive(Message msg) { } public byte[] getState() { Object key, val; Hashtable copy=new Hashtable(); for(Enumeration e=keys(); e.hasMoreElements();) { key=e.nextElement(); val=get(key); copy.put(key, val); } try { return Util.objectToByteBuffer(copy); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); return null; } } public void setState(byte[] new_state) { Hashtable new_copy; try { new_copy=(Hashtable)Util.objectFromByteBuffer(new_state); if(new_copy == null) return; } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); return; } _putAll(new_copy); state_promise.setResult(Boolean.TRUE); } /*------------------- Membership Changes ----------------------*/ public void viewAccepted(View new_view) { Vector new_mbrs=new_view.getMembers(); if(new_mbrs != null) { sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left) members.removeAllElements(); for(int i=0; i < new_mbrs.size(); i++) members.addElement(new_mbrs.elementAt(i)); } //if size is bigger than one, there are more peers in the group //otherwise there is only one server. send_message=members.size() > 1; } /** Called when a member is suspected */ public void suspect(Address suspected_mbr) { ; } /** Block sending and receiving of messages until ViewAccepted is called */ public void block() {} void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) { Vector joined, left; Object mbr; Notification n; if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null || old_mbrs.size() == 0 || new_mbrs.size() == 0) return; // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs joined=new Vector(); for(int i=0; i < new_mbrs.size(); i++) { mbr=new_mbrs.elementAt(i); if(!old_mbrs.contains(mbr)) joined.addElement(mbr); } // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs left=new Vector(); for(int i=0; i < old_mbrs.size(); i++) { mbr=old_mbrs.elementAt(i); if(!new_mbrs.contains(mbr)) { left.addElement(mbr); } } for(int i=0; i < notifs.size(); i++) { n=(Notification)notifs.elementAt(i); n.viewChange(joined, left); } } final void initSignatures() { try { if(put_signature == null) { put_signature=new Class[] {Object.class,Object.class}; } if(putAll_signature == null) { putAll_signature=new Class[] {Map.class}; } if(clear_signature == null) clear_signature=new Class[0]; if(remove_signature == null) { remove_signature=new Class[] {Object.class}; } } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception=" + ex); } } public static void main(String[] args) { try { // The setup here is kind of weird: // 1. Create a channel // 2. Create a DistributedHashtable (on the channel) // 3. Connect the channel (so the HT gets a VIEW_CHANGE) // 4. Start the HT // // A simpler setup is // DistributedHashtable ht = new DistributedHashtable("demo", null, // "file://c:/JGroups-2.0/conf/state_transfer.xml", 5000); JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/state_transfer.xml"); DistributedHashtable ht = new DistributedHashtable(c, false, 5000); c.connect("demo"); ht.start(5000); ht.put("name", "Michelle Ban"); Object old_key = ht.remove("name"); System.out.println("old key was " + old_key); ht.put("newkey", "newvalue"); Map m = new HashMap(); m.put("k1", "v1"); m.put("k2", "v2"); ht.putAll(m); System.out.println("hashmap is " + ht); } catch (Throwable t) { t.printStackTrace(); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?