📄 abstractreplicatedmap.java
字号:
}
public Member[] getMapMembersExcl(Member[] exclude) {
synchronized (mapMembers) {
HashMap list = (HashMap)mapMembers.clone();
for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
return getMapMembers(list);
}
}
/**
* Replicates any changes to the object since the last time
* The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
* @param complete - if set to true, the object is replicated to its backup
* if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
* be replicated
*/
public void replicate(Object key, boolean complete) {
if ( log.isTraceEnabled() )
log.trace("Replicate invoked on key:"+key);
MapEntry entry = (MapEntry)super.get(key);
if ( entry == null ) return;
if ( !entry.isSerializable() ) return;
if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
Object value = entry.getValue();
//check to see if we need to replicate this object isDirty()||complete
boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
if (!repl) {
if ( log.isTraceEnabled() )
log.trace("Not replicating:"+key+", no change made");
return;
}
//check to see if the message is diffable
boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
MapMessage msg = null;
if (diff) {
ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
try {
rentry.lock();
//construct a diff message
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
true, (Serializable) entry.getKey(), null,
rentry.getDiff(),
entry.getBackupNodes());
} catch (IOException x) {
log.error("Unable to diff object. Will replicate the entire object instead.", x);
} finally {
rentry.unlock();
}
}
if (msg == null) {
//construct a complete
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
null, entry.getBackupNodes());
}
try {
if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
channel.send(entry.getBackupNodes(), msg, channelSendOptions);
}
} catch (ChannelException x) {
log.error("Unable to replicate data.", x);
}
} //end if
}
/**
* This can be invoked by a periodic thread to replicate out any changes.
* For maps that don't store objects that implement ReplicatedMapEntry, this
* method should be used infrequently to avoid large amounts of data transfer
* @param complete boolean
*/
public void replicate(boolean complete) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
replicate(e.getKey(), complete);
} //while
}
public void transferState() {
try {
Member[] members = getMapMembers();
Member backup = members.length > 0 ? (Member) members[0] : null;
if (backup != null) {
MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
synchronized (stateMutex) {
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
ArrayList list = (ArrayList) msg.getValue();
for (int i = 0; i < list.size(); i++) {
messageReceived( (Serializable) list.get(i), resp[0].getSource());
} //for
}
} else {
log.warn("Transfer state, 0 replies, probably a timeout.");
}
}
} catch (ChannelException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
} catch (IOException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
} catch (ClassNotFoundException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
}
stateTransferred = true;
}
/**
* @todo implement state transfer
* @param msg Serializable
* @return Serializable - null if no reply should be sent
*/
public Serializable replyRequest(Serializable msg, final Member sender) {
if (! (msg instanceof MapMessage))return null;
MapMessage mapmsg = (MapMessage) msg;
//map init request
if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
return mapmsg;
}
//map start request
if (mapmsg.getMsgType() == mapmsg.MSG_START) {
mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
mapMemberAdded(sender);
return mapmsg;
}
//backup request
if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null || (!entry.isSerializable()) )return null;
mapmsg.setValue( (Serializable) entry.getValue());
return mapmsg;
}
//state transfer request
if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
synchronized (stateMutex) { //make sure we dont do two things at the same time
ArrayList list = new ArrayList();
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) e.getValue();
if ( entry.isSerializable() ) {
MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
list.add(me);
}
}
mapmsg.setValue(list);
return mapmsg;
} //synchronized
}
return null;
}
/**
* If the reply has already been sent to the requesting thread,
* the rpc callback can handle any data that comes in after the fact.
* @param msg Serializable
* @param sender Member
*/
public void leftOver(Serializable msg, Member sender) {
//left over membership messages
if (! (msg instanceof MapMessage))return;
MapMessage mapmsg = (MapMessage) msg;
try {
mapmsg.deserialize(getExternalLoaders());
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
} else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
memberAlive(mapmsg.getBackupNodes()[0]);
}
} catch (IOException x ) {
log.error("Unable to deserialize MapMessage.",x);
} catch (ClassNotFoundException x ) {
log.error("Unable to deserialize MapMessage.",x);
}
}
public void messageReceived(Serializable msg, Member sender) {
if (! (msg instanceof MapMessage)) return;
MapMessage mapmsg = (MapMessage) msg;
if ( log.isTraceEnabled() ) {
log.trace("Map["+mapname+"] received message:"+mapmsg);
}
try {
mapmsg.deserialize(getExternalLoaders());
} catch (IOException x) {
log.error("Unable to deserialize MapMessage.", x);
return;
} catch (ClassNotFoundException x) {
log.error("Unable to deserialize MapMessage.", x);
return;
}
if ( log.isTraceEnabled() )
log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
}
if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
memberDisappeared(mapmsg.getBackupNodes()[0]);
}
if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if ( entry==null ) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
entry.setBackup(false);
entry.setProxy(true);
entry.setBackupNodes(mapmsg.getBackupNodes());
super.put(entry.getKey(), entry);
} else {
entry.setProxy(true);
entry.setBackup(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
}
}
if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
super.remove(mapmsg.getKey());
}
if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
entry.setBackup(true);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
}
} else {
entry.setBackup(true);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
if (entry.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
if (mapmsg.isDiff()) {
try {
diff.lock();
diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
} catch (Exception x) {
log.error("Unable to apply diff to key:" + entry.getKey(), x);
} finally {
diff.unlock();
}
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
} //end if
} else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
re.setOwner(getMapOwner());
entry.setValue(re);
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
} //end if
} //end if
super.put(entry.getKey(), entry);
} //end if
}
public boolean accept(Serializable msg, Member sender) {
boolean result = false;
if (msg instanceof MapMessage) {
if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
}
return result;
}
public void mapMemberAdded(Member member) {
if ( member.equals(getChannel().getLocalMember(false)) ) return;
boolean memberAdded = false;
//select a backup node if we don't have one
synchronized (mapMembers) {
if (!mapMembers.containsKey(member) ) {
mapMembers.put(member, new Long(System.currentTimeMillis()));
memberAdded = true;
}
}
if ( memberAdded ) {
synchronized (stateMutex) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) e.getValue();
if ( entry == null ) continue;
if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
} catch (ChannelException x) {
log.error("Unable to select backup node.", x);
} //catch
} //end if
} //while
} //synchronized
}//end if
}
public boolean inSet(Member m, Member[] set) {
if ( set == null ) return false;
boolean result = false;
for (int i=0; i<set.length && (!result); i++ )
if ( m.equals(set[i]) ) result = true;
return result;
}
public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
ArrayList result = new ArrayList();
for (int i=0; i<set.length; i++ ) {
boolean include = true;
for (int j=0; j<mbrs.length; j++ )
if ( mbrs[j].equals(set[i]) ) include = false;
if ( include ) result.add(set[i]);
}
return (Member[])result.toArray(new Member[result.size()]);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -