📄 abstractreplicatedmap.java
字号:
} catch (ClassNotFoundException x) { log.error("Unable to transfer LazyReplicatedMap state.", x); } stateTransferred = true; } /** * @todo implement state transfer * @param msg Serializable * @return Serializable - null if no reply should be sent */ public Serializable replyRequest(Serializable msg, final Member sender) { if (! (msg instanceof MapMessage))return null; MapMessage mapmsg = (MapMessage) msg; //map init request if (mapmsg.getMsgType() == mapmsg.MSG_INIT) { mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false))); return mapmsg; } //map start request if (mapmsg.getMsgType() == mapmsg.MSG_START) { mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false))); mapMemberAdded(sender); return mapmsg; } //backup request if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) { MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); if (entry == null)return null; mapmsg.setValue( (Serializable) entry.getValue()); return mapmsg; } //state transfer request if (mapmsg.getMsgType() == mapmsg.MSG_STATE) { synchronized (stateMutex) { //make sure we dont do two things at the same time ArrayList list = new ArrayList(); Iterator i = super.entrySet().iterator(); while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); MapEntry entry = (MapEntry) e.getValue(); MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY, false, (Serializable) entry.getKey(), null,null, entry.getBackupNodes()); list.add(me); } mapmsg.setValue(list); return mapmsg; } //synchronized } return null; } /** * If the reply has already been sent to the requesting thread, * the rpc callback can handle any data that comes in after the fact. * @param msg Serializable * @param sender Member */ public void leftOver(Serializable msg, Member sender) { //left over membership messages if (! (msg instanceof MapMessage))return; MapMessage mapmsg = (MapMessage) msg; try { mapmsg.deserialize(getExternalLoaders()); if (mapmsg.getMsgType() == MapMessage.MSG_START) { mapMemberAdded(mapmsg.getBackupNodes()[0]); } } catch (IOException x ) { log.error("Unable to deserialize MapMessage.",x); } catch (ClassNotFoundException x ) { log.error("Unable to deserialize MapMessage.",x); } } public void messageReceived(Serializable msg, Member sender) { if (! (msg instanceof MapMessage)) return; MapMessage mapmsg = (MapMessage) msg; try { mapmsg.deserialize(getExternalLoaders()); } catch (IOException x) { log.error("Unable to deserialize MapMessage.", x); return; } catch (ClassNotFoundException x) { log.error("Unable to deserialize MapMessage.", x); return; } if (mapmsg.getMsgType() == MapMessage.MSG_START) { mapMemberAdded(mapmsg.getBackupNodes()[0]); } if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { memberDisappeared(mapmsg.getBackupNodes()[0]); } if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { MapEntry entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); entry.setBackup(false); entry.setProxy(true); entry.setBackupNodes(mapmsg.getBackupNodes()); super.put(entry.getKey(), entry); } if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) { super.remove(mapmsg.getKey()); } if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) { MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); if (entry == null) { entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); entry.setBackup(true); entry.setProxy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); } } else { entry.setBackup(true); entry.setProxy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue(); if (mapmsg.isDiff()) { try { diff.lock(); diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); } catch (Exception x) { log.error("Unable to apply diff to key:" + entry.getKey(), x); } finally { diff.unlock(); } } else { if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); } //end if } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); re.setOwner(getMapOwner()); entry.setValue(re); } else { if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); } //end if } //end if super.put(entry.getKey(), entry); } //end if } public boolean accept(Serializable msg, Member sender) { if (msg instanceof MapMessage) { return Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId()); } return false; } public void mapMemberAdded(Member member) { if ( member.equals(getChannel().getLocalMember(false)) ) return; boolean memberAdded = false; //select a backup node if we don't have one synchronized (mapMembers) { if (!mapMembers.contains(member) ) { mapMembers.add(member); memberAdded = true; } } if ( memberAdded ) { synchronized (stateMutex) { Iterator i = super.entrySet().iterator(); while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); MapEntry entry = (MapEntry) e.getValue(); if ( entry == null ) continue; if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); } catch (ChannelException x) { log.error("Unable to select backup node.", x); } //catch } //end if } //while } //synchronized }//end if } public boolean inSet(Member m, Member[] set) { if ( set == null ) return false; boolean result = false; for (int i=0; i<set.length && (!result); i++ ) if ( m.equals(set[i]) ) result = true; return result; } public Member[] excludeFromSet(Member[] mbrs, Member[] set) { ArrayList result = new ArrayList(); for (int i=0; i<set.length; i++ ) { boolean include = true; for (int j=0; j<mbrs.length; j++ ) if ( mbrs[j].equals(set[i]) ) include = false; if ( include ) result.add(set[i]); } return (Member[])result.toArray(new Member[result.size()]); } public void memberAdded(Member member) { //do nothing } public void memberDisappeared(Member member) { boolean removed = false; synchronized (mapMembers) { removed = mapMembers.remove(member); } Iterator i = super.entrySet().iterator(); while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); MapEntry entry = (MapEntry) e.getValue(); if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) { try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); } catch (ChannelException x) { log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); } } //end if } //while } int currentNode = 0; public Member getNextBackupNode() { Member[] members = getMapMembers(); if (members.length == 0)return null; int node = currentNode++; if (node >= members.length) { node = 0; currentNode = 0; } return members[node]; } protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;//------------------------------------------------------------------------------ // METHODS TO OVERRIDE //------------------------------------------------------------------------------ protected void printMap(String header) { try { System.out.println("\nDEBUG MAP:"+header); System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size()); Member[] mbrs = getMapMembers(); for ( int i=0; i<mbrs.length;i++ ) { System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName()); } Iterator i = super.entrySet().iterator(); int cnt = 0; while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); System.out.println( (++cnt) + ". " + e.getValue()); } System.out.println("EndMap]\n\n"); }catch ( Exception ignore) { ignore.printStackTrace(); } }//------------------------------------------------------------------------------// Map Entry class//------------------------------------------------------------------------------ public static class MapEntry implements Map.Entry { private boolean backup; private boolean proxy; private Member[] backupNodes; private Serializable key; private Serializable value; public MapEntry(Serializable key, Serializable value) { setKey(key); setValue(value); } public boolean isBackup() { return backup; } public void setBackup(boolean backup) { this.backup = backup; } public boolean isProxy() { return proxy; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -