replicatedhashtable.java
来自「JGRoups源码」· Java 代码 · 共 511 行 · 第 1/2 页
JAVA
511 行
return retval; } /*------------------------ Callbacks -----------------------*/ Object _put(Object key, Object value) { Object retval=super.put(key, value); for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).entrySet(key, value); return retval; } void _clear() { super.clear(); } Object _remove(Object key) { Object retval=super.remove(key); for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).entryRemoved(key); return retval; } /** * @see java.util.Map#putAll(java.util.Map) */ public void _putAll(Map m) { if(m == null) return; //######## The same way as in the DistributedHashtable // Calling the method below seems okay, but would result in ... deadlock ! // The reason is that Map.putAll() calls put(), which we override, which results in // lock contention for the map. // ---> super.putAll(m); <--- CULPRIT !!!@#$%$ // That said let's do it the stupid way: //######## The same way as in the DistributedHashtable Map.Entry entry; for(Iterator it=m.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); super.put(entry.getKey(), entry.getValue()); } for(int i=0; i < notifs.size(); i++) ((Notification)notifs.elementAt(i)).contentsSet(m); } /*----------------------------------------------------------*/ /*-------------------- MessageListener ----------------------*/ public void receive(Message msg) { Request req=null; if(msg == null) return; req=(Request)msg.getObject(); if(req == null) return; switch(req.req_type) { case Request.PUT: if(req.key != null && req.val != null) _put(req.key, req.val); break; case Request.REMOVE: if(req.key != null) _remove(req.key); break; case Request.CLEAR: _clear(); break; case Request.PUT_ALL: if(req.val != null) _putAll((Map)req.val); break; default : // error } } public byte[] getState() { Object key, val; Hashtable copy=new Hashtable(); for(Enumeration e=keys(); e.hasMoreElements();) { key=e.nextElement(); val=get(key); copy.put(key, val); } try { return Util.objectToByteBuffer(copy); } catch(Exception ex) { if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); return null; } } public void setState(byte[] new_state) { Hashtable new_copy; Object key; try { new_copy=(Hashtable)Util.objectFromByteBuffer(new_state); if(new_copy == null) { notifyStateTransferCompleted(true); return; } } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); notifyStateTransferCompleted(false); return; } _clear(); // remove all elements for(Enumeration e=new_copy.keys(); e.hasMoreElements();) { key=e.nextElement(); _put(key, new_copy.get(key)); } notifyStateTransferCompleted(true); } /*-------------------- End of MessageListener ----------------------*/ /*----------------------- MembershipListener ------------------------*/ public void viewAccepted(View new_view) { Vector new_mbrs=new_view.getMembers(); if(new_mbrs != null) { sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left) members.removeAllElements(); for(int i=0; i < new_mbrs.size(); i++) members.addElement(new_mbrs.elementAt(i)); } //if size is bigger than one, there are more peers in the group //otherwise there is only one server. if(members.size() > 1) { send_message=true; } else { send_message=false; } } /** Called when a member is suspected */ public void suspect(Address suspected_mbr) { ; } /** Block sending and receiving of messages until ViewAccepted is called */ public void block() { } /*------------------- End of MembershipListener ----------------------*/ void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) { Vector joined, left; Object mbr; Notification n; if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null || old_mbrs.size() == 0 || new_mbrs.size() == 0) return; // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs joined=new Vector(); for(int i=0; i < new_mbrs.size(); i++) { mbr=new_mbrs.elementAt(i); if(!old_mbrs.contains(mbr)) joined.addElement(mbr); } // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs left=new Vector(); for(int i=0; i < old_mbrs.size(); i++) { mbr=old_mbrs.elementAt(i); if(!new_mbrs.contains(mbr)) { left.addElement(mbr); } } for(int i=0; i < notifs.size(); i++) { n=(Notification)notifs.elementAt(i); n.viewChange(joined, left); } } void notifyStateTransferStarted() { state_transfer_running=true; for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) { StateTransferListener listener=(StateTransferListener)it.next(); try { listener.stateTransferStarted(); } catch(Throwable t) { } } } void notifyStateTransferCompleted(boolean success) { state_transfer_running=false; for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) { StateTransferListener listener=(StateTransferListener)it.next(); try { listener.stateTransferCompleted(success); } catch(Throwable t) { } } } private static class Request implements Serializable { static final int PUT=1; static final int REMOVE=2; static final int CLEAR=3; static final int PUT_ALL=4; int req_type=0; Object key=null; Object val=null; Request(int req_type, Object key, Object val) { this.req_type=req_type; this.key=key; this.val=val; } public String toString() { StringBuffer sb=new StringBuffer(); sb.append(type2String(req_type)); if(key != null) sb.append("\nkey=" + key); if(val != null) sb.append("\nval=" + val); return sb.toString(); } String type2String(int t) { switch(t) { case PUT: return "PUT"; case REMOVE: return "REMOVE"; case CLEAR: return "CLEAR"; case PUT_ALL: return "PUT_ALL"; default : return "<unknown>"; } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?