📄 nonblockingcoordinator.java
字号:
MemberImpl local = (MemberImpl)getLocalMember(false);
if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) {
if ( isHighest() ) {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
"Heartbeat found inconsistency, restart election"));
startElection(true);
}
}
} catch ( Exception x ){
log.error("Unable to perform heartbeat.",x);
} finally {
super.heartbeat();
}
}
/**
* has members
*/
public boolean hasMembers() {
return membership.hasMembers();
}
/**
* Get all current cluster members
* @return all members or empty array
*/
public Member[] getMembers() {
return membership.getMembers();
}
/**
*
* @param mbr Member
* @return Member
*/
public Member getMember(Member mbr) {
return membership.getMember(mbr);
}
/**
* Return the member that represents this node.
*
* @return Member
*/
public Member getLocalMember(boolean incAlive) {
Member local = super.getLocalMember(incAlive);
if ( view == null && (local != null)) setupMembership();
return local;
}
protected synchronized void setupMembership() {
if ( membership == null ) {
membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
}
}
//============================================================================================================
// HELPER CLASSES FOR COORDINATION
//============================================================================================================
public static class CoordinationMessage {
//X{A-ldr, A-src, mbrs-A,B,C,D}
protected XByteBuffer buf;
protected MemberImpl leader;
protected MemberImpl source;
protected MemberImpl[] view;
protected UniqueId id;
protected byte[] type;
protected long timestamp = System.currentTimeMillis();
public CoordinationMessage(XByteBuffer buf) {
this.buf = buf;
parse();
}
public CoordinationMessage(MemberImpl leader,
MemberImpl source,
MemberImpl[] view,
UniqueId id,
byte[] type) {
this.buf = new XByteBuffer(4096,false);
this.leader = leader;
this.source = source;
this.view = view;
this.id = id;
this.type = type;
this.write();
}
public byte[] getHeader() {
return NonBlockingCoordinator.COORD_HEADER;
}
public MemberImpl getLeader() {
if ( leader == null ) parse();
return leader;
}
public MemberImpl getSource() {
if ( source == null ) parse();
return source;
}
public UniqueId getId() {
if ( id == null ) parse();
return id;
}
public MemberImpl[] getMembers() {
if ( view == null ) parse();
return view;
}
public byte[] getType() {
if (type == null ) parse();
return type;
}
public XByteBuffer getBuffer() {
return this.buf;
}
public void parse() {
//header
int offset = 16;
//leader
int ldrLen = buf.toInt(buf.getBytesDirect(),offset);
offset += 4;
byte[] ldr = new byte[ldrLen];
System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen);
leader = MemberImpl.getMember(ldr);
offset += ldrLen;
//source
int srcLen = buf.toInt(buf.getBytesDirect(),offset);
offset += 4;
byte[] src = new byte[srcLen];
System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen);
source = MemberImpl.getMember(src);
offset += srcLen;
//view
int mbrCount = buf.toInt(buf.getBytesDirect(),offset);
offset += 4;
view = new MemberImpl[mbrCount];
for (int i=0; i<view.length; i++ ) {
int mbrLen = buf.toInt(buf.getBytesDirect(),offset);
offset += 4;
byte[] mbr = new byte[mbrLen];
System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen);
view[i] = MemberImpl.getMember(mbr);
offset += mbrLen;
}
//id
this.id = new UniqueId(buf.getBytesDirect(),offset,16);
offset += 16;
type = new byte[16];
System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length);
offset += 16;
}
public void write() {
buf.reset();
//header
buf.append(COORD_HEADER,0,COORD_HEADER.length);
//leader
byte[] ldr = leader.getData(false,false);
buf.append(ldr.length);
buf.append(ldr,0,ldr.length);
ldr = null;
//source
byte[] src = source.getData(false,false);
buf.append(src.length);
buf.append(src,0,src.length);
src = null;
//view
buf.append(view.length);
for (int i=0; i<view.length; i++ ) {
byte[] mbr = view[i].getData(false,false);
buf.append(mbr.length);
buf.append(mbr,0,mbr.length);
}
//id
buf.append(id.getBytes(),0,id.getBytes().length);
buf.append(type,0,type.length);
}
}
public void fireInterceptorEvent(InterceptorEvent event) {
if (event instanceof CoordinationEvent &&
((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX)
log.info(event);
}
public static class CoordinationEvent implements InterceptorEvent {
public static final int EVT_START = 1;
public static final int EVT_MBR_ADD = 2;
public static final int EVT_MBR_DEL = 3;
public static final int EVT_START_ELECT = 4;
public static final int EVT_PROCESS_ELECT = 5;
public static final int EVT_MSG_ARRIVE = 6;
public static final int EVT_PRE_MERGE = 7;
public static final int EVT_POST_MERGE = 8;
public static final int EVT_WAIT_FOR_MSG = 9;
public static final int EVT_SEND_MSG = 10;
public static final int EVT_STOP = 11;
public static final int EVT_CONF_RX = 12;
public static final int EVT_ELECT_ABANDONED = 13;
int type;
ChannelInterceptor interceptor;
Member coord;
Member[] mbrs;
String info;
Membership view;
Membership suggestedView;
public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) {
this.type = type;
this.interceptor = interceptor;
this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator();
this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers();
this.info = info;
this.view = ((NonBlockingCoordinator)interceptor).view;
this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView;
}
public int getEventType() {
return type;
}
public String getEventTypeDesc() {
switch (type) {
case EVT_START: return "EVT_START:"+info;
case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
case EVT_START_ELECT: return "EVT_START_ELECT:"+info;
case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
case EVT_STOP: return "EVT_STOP:"+info;
case EVT_CONF_RX: return "EVT_CONF_RX:"+info;
case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
default: return "Unknown";
}
}
public ChannelInterceptor getInterceptor() {
return interceptor;
}
public String toString() {
StringBuffer buf = new StringBuffer("CoordinationEvent[type=");
buf.append(type).append("\n\tLocal:");
Member local = interceptor.getLocalMember(false);
buf.append(local!=null?local.getName():"").append("\n\tCoord:");
buf.append(coord!=null?coord.getName():"").append("\n\tView:");
buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:");
buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:");
buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
buf.append(info).append("]");
return buf.toString();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -