📄 nonblockingcoordinator.java
字号:
data.setAddress(local);
data.setMessage(msg.getBuffer());
data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
data.setTimestamp(System.currentTimeMillis());
return data;
}
protected void viewChange(UniqueId viewId, Member[] view) {
//invoke any listeners
}
protected boolean alive(Member mbr) {
return TcpFailureDetector.memberAlive(mbr,
COORD_ALIVE,
false,
false,
waitForCoordMsgTimeout,
waitForCoordMsgTimeout,
getOptionFlag());
}
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
MemberImpl local = (MemberImpl)getLocalMember(false);
Membership merged = new Membership(local,AbsoluteOrder.comp,true);
Arrays.fill(merged,msg.getMembers());
Arrays.fill(merged,getMembers());
Member[] diff = Arrays.diff(merged,membership,local);
for ( int i=0; i<diff.length; i++ ) {
if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
else memberAdded(diff[i],false);
}
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
return merged;
}
protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
if ( !coordMsgReceived.get() ) {
coordMsgReceived.set(true);
synchronized (electionMutex) { electionMutex.notifyAll();}
}
msg.timestamp = System.currentTimeMillis();
Membership merged = mergeOnArrive(msg, sender);
if (isViewConf(msg)) handleViewConf(msg, sender, merged);
else handleToken(msg, sender, merged);
ClassLoader loader;
}
protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
MemberImpl local = (MemberImpl)getLocalMember(false);
if ( local.equals(msg.getSource()) ) {
//my message msg.src=local
handleMyToken(local, msg, sender,merged);
} else {
handleOtherToken(local, msg, sender,merged);
}
}
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
if ( local.equals(msg.getLeader()) ) {
//no leadership change
if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
msg.type = COORD_CONF;
super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
handleViewConf(msg,local,merged);
} else {
//membership change
suggestedView = new Membership(local,AbsoluteOrder.comp,true);
suggestedviewId = msg.getId();
Arrays.fill(suggestedView,merged.getMembers());
msg.view = (MemberImpl[])merged.getMembers();
sendElectionMsgToNextInline(local,msg);
}
} else {
//leadership change
suggestedView = null;
suggestedviewId = null;
msg.view = (MemberImpl[])merged.getMembers();
sendElectionMsgToNextInline(local,msg);
}
}
protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
if ( local.equals(msg.getLeader()) ) {
//I am the new leader
//startElection(false);
} else {
msg.view = (MemberImpl[])merged.getMembers();
sendElectionMsgToNextInline(local,msg);
}
}
protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
Arrays.fill(view,msg.getMembers());
viewId = msg.getId();
if ( viewId.equals(suggestedviewId) ) {
suggestedView = null;
suggestedviewId = null;
}
if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
suggestedView = null;
suggestedviewId = null;
}
viewChange(viewId,view.getMembers());
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
startElection(false);
}
}
protected boolean isViewConf(CoordinationMessage msg) {
return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
}
protected boolean hasHigherPriority(Member[] complete, Member[] local) {
if ( local == null || local.length == 0 ) return false;
if ( complete == null || complete.length == 0 ) return true;
AbsoluteOrder.absoluteOrder(complete);
AbsoluteOrder.absoluteOrder(local);
return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
}
/**
* Returns coordinator if one is available
* @return Member
*/
public Member getCoordinator() {
return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
}
public Member[] getView() {
return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
}
public UniqueId getViewId() {
return viewId;
}
/**
* Block in/out messages while a election is going on
*/
protected void halt() {
}
/**
* Release lock for in/out messages election is completed
*/
protected void release() {
}
/**
* Wait for an election to end
*/
protected void waitForRelease() {
}
//============================================================================================================
// OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE
//============================================================================================================
public void start(int svc) throws ChannelException {
if (membership == null) setupMembership();
if (started)return;
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
super.start(startsvc);
started = true;
if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
startElection(false);
}
public void stop(int svc) throws ChannelException {
try {
halt();
synchronized (electionMutex) {
if (!started)return;
started = false;
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
super.stop(startsvc);
this.view = null;
this.viewId = null;
this.suggestedView = null;
this.suggestedviewId = null;
this.membership.reset();
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
}
}finally {
release();
}
}
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
waitForRelease();
super.sendMessage(destination, msg, payload);
}
public void messageReceived(ChannelMessage msg) {
if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
//ignore message, its an alive message
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));
} else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
try {
CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
Member[] cmbr = cmsg.getMembers();
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
processCoordMessage(cmsg, msg.getAddress());
}catch ( ChannelException x ) {
log.error("Error processing coordination message. Could be fatal.",x);
}
} else {
super.messageReceived(msg);
}
}
public boolean accept(ChannelMessage msg) {
return super.accept(msg);
}
public void memberAdded(Member member) {
memberAdded(member,true);
}
public void memberAdded(Member member,boolean elect) {
try {
if ( membership == null ) setupMembership();
if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
try {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
if (started && elect) startElection(false);
}catch ( ChannelException x ) {
log.error("Unable to start election when member was added.",x);
}
}finally {
}
}
public void memberDisappeared(Member member) {
try {
membership.removeMember((MemberImpl)member);
super.memberDisappeared(member);
try {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
if ( started && (isCoordinator() || isHighest()) )
startElection(true); //to do, if a member disappears, only the coordinator can start
}catch ( ChannelException x ) {
log.error("Unable to start election when member was removed.",x);
}
}finally {
}
}
public boolean isHighest() {
Member local = getLocalMember(false);
if ( membership.getMembers().length == 0 ) return true;
else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
}
public boolean isCoordinator() {
Member coord = getCoordinator();
return coord != null && getLocalMember(false).equals(coord);
}
public void heartbeat() {
try {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -