📄 fd_prob.java
字号:
void updateCounters(FdHeader hdr) { Address key; FdEntry entry; if(hdr == null || hdr.members == null || hdr.counters == null) { if(warn) log.warn("hdr is null or contains no counters"); return; } for(int i=0; i < hdr.members.length; i++) { key=hdr.members[i]; if(key == null) continue; entry=(FdEntry) counters.get(key); if(entry == null) { entry=new FdEntry(hdr.counters[i]); counters.put(key, entry); continue; } if(entry.excluded()) continue; // only update counter (and adjust timestamp) if new counter is greater then old one entry.setCounter(Math.max(entry.getCounter(), hdr.counters[i])); } } /** Resets the counter for mbr */ void updateCounter(Address mbr) { FdEntry entry; if(mbr == null) return; entry=(FdEntry) counters.get(mbr); if(entry != null) entry.setTimestamp(); } String printCounters() { StringBuffer sb=new StringBuffer(); Address mbr; FdEntry entry; for(Enumeration e=counters.keys(); e.hasMoreElements();) { mbr=(Address) e.nextElement(); entry=(FdEntry) counters.get(mbr); sb.append("\n" + mbr + ": " + entry._toString()); } return sb.toString(); } static Vector computeExcludedMembers(Vector old_mbrship, Vector new_mbrship) { Vector ret=new Vector(); if(old_mbrship == null || new_mbrship == null) return ret; for(int i=0; i < old_mbrship.size(); i++) if(!new_mbrship.contains(old_mbrship.elementAt(i))) ret.addElement(old_mbrship.elementAt(i)); return ret; } /** If hb_sender is not a member, send a SUSPECT to sender (after n pings received) */ boolean checkPingerValidity(Object hb_sender) { int num_pings=0; Message shun_msg; Header hdr; if(hb_sender != null && members != null && !members.contains(hb_sender)) { if(invalid_pingers.containsKey(hb_sender)) { num_pings=((Integer) invalid_pingers.get(hb_sender)).intValue(); if(num_pings >= max_tries) { if(log.isErrorEnabled()) log.error("sender " + hb_sender + " is not member in " + members + " ! Telling it to leave group"); shun_msg=new Message((Address) hb_sender, null, null); hdr=new FdHeader(FdHeader.NOT_MEMBER); shun_msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, shun_msg)); invalid_pingers.remove(hb_sender); } else { num_pings++; invalid_pingers.put(hb_sender, new Integer(num_pings)); } } else { num_pings++; invalid_pingers.put(hb_sender, new Integer(num_pings)); } return false; } else return true; } /* ----------------------------- End of Private Methods --------------------------- */ public static class FdHeader extends Header implements Streamable { static final byte HEARTBEAT=1; // sent periodically to a random member static final byte NOT_MEMBER=2; // sent to the sender, when it is not a member anymore (shunned) byte type=HEARTBEAT; Address[] members=null; long[] counters=null; // correlates with 'members' (same indexes) public FdHeader() { } // used for externalization FdHeader(byte type) { this.type=type; } FdHeader(byte type, int num_elements) { this(type); members=new Address[num_elements]; counters=new long[num_elements]; } public String toString() { switch(type) { case HEARTBEAT: return "[FD_PROB: HEARTBEAT]"; case NOT_MEMBER: return "[FD_PROB: NOT_MEMBER]"; default: return "[FD_PROB: unknown type (" + type + ")]"; } } public String printDetails() { StringBuffer sb=new StringBuffer(); Address mbr; if(members != null && counters != null) for(int i=0; i < members.length; i++) { mbr=members[i]; if(mbr == null) sb.append("\n<null>"); else sb.append("\n" + mbr); sb.append(": " + counters[i]); } return sb.toString(); } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); if(members != null) { out.writeInt(members.length); out.writeObject(members); } else out.writeInt(0); if(counters != null) { out.writeInt(counters.length); for(int i=0; i < counters.length; i++) out.writeLong(counters[i]); } else out.writeInt(0); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { int num; type=in.readByte(); num=in.readInt(); if(num == 0) members=null; else { members=(Address[]) in.readObject(); } num=in.readInt(); if(num == 0) counters=null; else { counters=new long[num]; for(int i=0; i < counters.length; i++) counters[i]=in.readLong(); } } public long size() { long retval=Global.BYTE_SIZE; retval+=Global.SHORT_SIZE; // number of members if(members != null && members.length > 0) { for(int i=0; i < members.length; i++) { Address member=members[i]; retval+=Util.size(member); } } retval+=Global.SHORT_SIZE; // counters if(counters != null && counters.length > 0) { retval+=counters.length * Global.LONG_SIZE; } return retval; } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); if(members == null || members.length == 0) out.writeShort(0); else { out.writeShort(members.length); for(int i=0; i < members.length; i++) { Address member=members[i]; Util.writeAddress(member, out); } } if(counters == null || counters.length == 0) { out.writeShort(0); } else { out.writeShort(counters.length); for(int i=0; i < counters.length; i++) { long counter=counters[i]; out.writeLong(counter); } } } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); short len=in.readShort(); if(len > 0) { members=new Address[len]; for(int i=0; i < len; i++) { members[i]=Util.readAddress(in); } } len=in.readShort(); if(len > 0) { counters=new long[len]; for(int i=0; i < counters.length; i++) { counters[i]=in.readLong(); } } } } private static class FdEntry { private long counter=0; // heartbeat counter private long timestamp=0; // last time the counter was incremented private boolean excluded=false; // set to true if member was excluded from group FdEntry() { } FdEntry(long counter) { this.counter=counter; timestamp=System.currentTimeMillis(); } long getCounter() { return counter; } long getTimestamp() { return timestamp; } boolean excluded() { return excluded; } synchronized void setCounter(long new_counter) { if(new_counter > counter) { // only set time if counter was incremented timestamp=System.currentTimeMillis(); counter=new_counter; } } synchronized void incrementCounter() { counter++; timestamp=System.currentTimeMillis(); } synchronized void setTimestamp() { timestamp=System.currentTimeMillis(); } synchronized void setExcluded(boolean flag) { excluded=flag; } public String toString() { return "counter=" + counter + ", timestamp=" + timestamp + ", excluded=" + excluded; } public String _toString() { return "counter=" + counter + ", age=" + (System.currentTimeMillis() - timestamp) + ", excluded=" + excluded; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -