📄 abstractreplicatedmap.java
字号:
}
public void memberAdded(Member member) {
//do nothing
}
public void memberDisappeared(Member member) {
boolean removed = false;
synchronized (mapMembers) {
removed = (mapMembers.remove(member) != null );
}
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) e.getValue();
if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
} catch (ChannelException x) {
log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
}
} //end if
} //while
}
public int getNextBackupIndex() {
int size = mapMembers.size();
if (mapMembers.size() == 0)return -1;
int node = currentNode++;
if (node >= size) {
node = 0;
currentNode = 0;
}
return node;
}
public Member getNextBackupNode() {
Member[] members = getMapMembers();
int node = getNextBackupIndex();
if ( members.length == 0 || node==-1) return null;
if ( node >= members.length ) node = 0;
return members[node];
}
protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
public void heartbeat() {
try {
ping(accessTimeout);
}catch ( Exception x ) {
log.error("Unable to send AbstractReplicatedMap.ping message",x);
}
}
//------------------------------------------------------------------------------
// METHODS TO OVERRIDE
//------------------------------------------------------------------------------
/**
* Removes an object from this map, it will also remove it from
*
* @param key Object
* @return Object
*/
public Object remove(Object key) {
MapEntry entry = (MapEntry)super.remove(key);
try {
if (getMapMembers().length > 0 ) {
MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
getChannel().send(getMapMembers(), msg, getChannelSendOptions());
}
} catch ( ChannelException x ) {
log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
}
return entry!=null?entry.getValue():null;
}
public Object get(Object key) {
MapEntry entry = (MapEntry)super.get(key);
if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
if ( entry == null ) return null;
if ( !entry.isPrimary() ) {
//if the message is not primary, we need to retrieve the latest value
try {
Member[] backup = null;
MapMessage msg = null;
if ( !entry.isBackup() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null, null);
Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
if (resp == null || resp.length == 0) {
//no responses
log.warn("Unable to retrieve remote object for key:" + key);
return null;
}
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
backup = entry.getBackupNodes();
if ( entry.getValue() instanceof ReplicatedMapEntry ) {
ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
val.setOwner(getMapOwner());
}
if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
}
if (entry.isBackup()) {
//select a new backup node
backup = publishEntryInfo(key, entry.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
}
}
entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
} catch (Exception x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
return null;
}
}
if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
//hack, somehow this is not being set above
val.setOwner(getMapOwner());
}
return entry.getValue();
}
protected void printMap(String header) {
try {
System.out.println("\nDEBUG MAP:"+header);
System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
Member[] mbrs = getMapMembers();
for ( int i=0; i<mbrs.length;i++ ) {
System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
}
Iterator i = super.entrySet().iterator();
int cnt = 0;
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
System.out.println( (++cnt) + ". " + e.getValue());
}
System.out.println("EndMap]\n\n");
}catch ( Exception ignore) {
ignore.printStackTrace();
}
}
/**
* Returns true if the key has an entry in the map.
* The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
* will make this entry primary for the group
* @param key Object
* @return boolean
*/
public boolean containsKey(Object key) {
return super.containsKey(key);
}
public Object put(Object key, Object value) {
MapEntry entry = new MapEntry(key,value);
entry.setBackup(false);
entry.setProxy(false);
Object old = null;
//make sure that any old values get removed
if ( containsKey(key) ) old = remove(key);
try {
Member[] backup = publishEntryInfo(key, value);
entry.setBackupNodes(backup);
} catch (ChannelException x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
}
super.put(key,entry);
return old;
}
/**
* Copies all values from one map to this instance
* @param m Map
*/
public void putAll(Map m) {
Iterator i = m.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry entry = (Map.Entry)i.next();
put(entry.getKey(),entry.getValue());
}
}
public void clear() {
//only delete active keys
Iterator keys = keySet().iterator();
while ( keys.hasNext() ) remove(keys.next());
}
public boolean containsValue(Object value) {
if ( value == null ) {
return super.containsValue(value);
} else {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) e.getValue();
if (entry.isPrimary() && value.equals(entry.getValue())) return true;
}//while
return false;
}//end if
}
public Object clone() {
throw new UnsupportedOperationException("This operation is not valid on a replicated map");
}
/**
* Returns the entire contents of the map
* Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
* about the object.
* @return Set
*/
public Set entrySetFull() {
return super.entrySet();
}
public Set keySetFull() {
return super.keySet();
}
public int sizeFull() {
return super.size();
}
public Set entrySet() {
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)e.getValue();
if ( entry.isPrimary() ) set.add(entry);
}
return Collections.unmodifiableSet(set);
}
public Set keySet() {
//todo implement
//should only return keys where this is active.
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)e.getValue();
if ( entry.isPrimary() ) set.add(entry.getKey());
}
return Collections.unmodifiableSet(set);
}
public int size() {
//todo, implement a counter variable instead
//only count active members in this node
int counter = 0;
Iterator it = super.entrySet().iterator();
while (it.hasNext() ) {
Map.Entry e = (Map.Entry) it.next();
if ( e != null ) {
MapEntry entry = (MapEntry) e.getValue();
if (entry.isPrimary() && entry.getValue() != null) counter++;
}
}
return counter;
}
protected boolean removeEldestEntry(Map.Entry eldest) {
return false;
}
public boolean isEmpty() {
return size()==0;
}
public Collection values() {
ArrayList values = new ArrayList();
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)e.getValue();
if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
}
return Collections.unmodifiableCollection(values);
}
//------------------------------------------------------------------------------
// Map Entry class
//------------------------------------------------------------------------------
public static class MapEntry implements Map.Entry {
private boolean backup;
private boolean proxy;
private Member[] backupNodes;
private Object key;
private Object value;
public MapEntry(Object key, Object value) {
setKey(key);
setValue(value);
}
public boolean isKeySerializable() {
return (key == null) || (key instanceof Serializable);
}
public boolean isValueSerializable() {
return (value==null) || (value instanceof Serializable);
}
public boolean isSerializable() {
return isKeySerializable() && isValueSerializable();
}
public boolean isBackup() {
return backup;
}
public void setBackup(boolean backup) {
this.backup = backup;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -