fd.java
来自「JGRoups源码」· Java 代码 · 共 686 行 · 第 1/2 页
JAVA
686 行
if(invalid_pingers.containsKey(hb_sender)) { num_pings=((Integer)invalid_pingers.get(hb_sender)).intValue(); if(num_pings >= max_tries) { if(log.isDebugEnabled()) log.debug(hb_sender + " is not in " + members + " ! Shunning it"); shun_msg=new Message(hb_sender, null, null); shun_msg.putHeader(name, new FdHeader(FdHeader.NOT_MEMBER)); passDown(new Event(Event.MSG, shun_msg)); invalid_pingers.remove(hb_sender); } else { num_pings++; invalid_pingers.put(hb_sender, new Integer(num_pings)); } } else { num_pings++; invalid_pingers.put(hb_sender, new Integer(num_pings)); } } } public static class FdHeader extends Header implements Streamable { public static final byte HEARTBEAT=0; public static final byte HEARTBEAT_ACK=1; public static final byte SUSPECT=2; public static final byte NOT_MEMBER=3; // received as response by pinged mbr when we are not a member byte type=HEARTBEAT; Vector mbrs=null; Address from=null; // member who detected that suspected_mbr has failed public FdHeader() { } // used for externalization public FdHeader(byte type) { this.type=type; } public FdHeader(byte type, Vector mbrs, Address from) { this(type); this.mbrs=mbrs; this.from=from; } public String toString() { switch(type) { case HEARTBEAT: return "[FD: heartbeat]"; case HEARTBEAT_ACK: return "[FD: heartbeat ack]"; case SUSPECT: return "[FD: SUSPECT (suspected_mbrs=" + mbrs + ", from=" + from + ")]"; case NOT_MEMBER: return "[FD: NOT_MEMBER]"; default: return "[FD: unknown type (" + type + ")]"; } } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); if(mbrs == null) out.writeBoolean(false); else { out.writeBoolean(true); out.writeInt(mbrs.size()); for(Iterator it=mbrs.iterator(); it.hasNext();) { Address addr=(Address)it.next(); Marshaller.write(addr, out); } } Marshaller.write(from, out); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readByte(); boolean mbrs_not_null=in.readBoolean(); if(mbrs_not_null) { int len=in.readInt(); mbrs=new Vector(11); for(int i=0; i < len; i++) { Address addr=(Address)Marshaller.read(in); mbrs.add(addr); } } from=(Address)Marshaller.read(in); } public long size() { int retval=Global.BYTE_SIZE; // type retval+=Util.size(mbrs); retval+=Util.size(from); return retval; } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); Util.writeAddresses(mbrs, out); Util.writeAddress(from, out); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); mbrs=(Vector)Util.readAddresses(in, Vector.class); from=Util.readAddress(in); } } protected class Monitor implements TimeScheduler.Task { boolean started=true; public void stop() { started=false; } public boolean cancelled() { return !started; } public long nextInterval() { return timeout; } public void run() { Message hb_req; long not_heard_from; // time in msecs we haven't heard from ping_dest if(ping_dest == null) { if(warn) log.warn("ping_dest is null: members=" + members + ", pingable_mbrs=" + pingable_mbrs + ", local_addr=" + local_addr); return; } // 1. send heartbeat request hb_req=new Message(ping_dest, null, null); hb_req.putHeader(name, new FdHeader(FdHeader.HEARTBEAT)); // send heartbeat request if(log.isDebugEnabled()) log.debug("sending are-you-alive msg to " + ping_dest + " (own address=" + local_addr + ')'); passDown(new Event(Event.MSG, hb_req)); num_heartbeats++; // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been // received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install // a new view not_heard_from=System.currentTimeMillis() - last_ack; // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003) if(not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs if(num_tries >= max_tries) { if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: received no heartbeat ack from " + ping_dest + " for " + (num_tries +1) + " times (" + ((num_tries+1) * timeout) + " milliseconds), suspecting it"); // broadcast a SUSPECT message to all members - loop until // unsuspect or view change is received bcast_task.addSuspectedMember(ping_dest); num_tries=0; if(stats) { num_suspect_events++; suspect_history.add(ping_dest); } } else { if(log.isDebugEnabled()) log.debug("heartbeat missing from " + ping_dest + " (number=" + num_tries + ')'); num_tries++; } } } public String toString() { return Boolean.toString(started); } } /** * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes * sure they are retransmitted until a view has been received which doesn't contain the suspected members * any longer. Then the task terminates. */ protected final class Broadcaster { final Vector suspected_mbrs=new Vector(7); BroadcastTask task=null; private final Object bcast_mutex=new Object(); Vector getSuspectedMembers() { return suspected_mbrs; } /** * Starts a new task, or - if already running - adds the argument to the running task. * @param suspect */ private void startBroadcastTask(Address suspect) { synchronized(bcast_mutex) { if(task == null || task.cancelled()) { task=new BroadcastTask((Vector)suspected_mbrs.clone()); task.addSuspectedMember(suspect); task.run(); // run immediately the first time timer.add(task); // then every timeout milliseconds, until cancelled if(trace) log.trace("BroadcastTask started"); } else { task.addSuspectedMember(suspect); } } } private void stopBroadcastTask() { synchronized(bcast_mutex) { if(task != null) { task.stop(); task=null; } } } /** Adds a suspected member. Starts the task if not yet running */ protected void addSuspectedMember(Address mbr) { if(mbr == null) return; if(!members.contains(mbr)) return; boolean added=false; synchronized(suspected_mbrs) { if(!suspected_mbrs.contains(mbr)) { suspected_mbrs.addElement(mbr); added=true; } } if(added) startBroadcastTask(mbr); } void removeSuspectedMember(Address suspected_mbr) { if(suspected_mbr == null) return; if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr); synchronized(suspected_mbrs) { suspected_mbrs.removeElement(suspected_mbr); if(suspected_mbrs.size() == 0) stopBroadcastTask(); } } void removeAll() { synchronized(suspected_mbrs) { suspected_mbrs.removeAllElements(); stopBroadcastTask(); } } /** Removes all elements from suspected_mbrs that are <em>not</em> in the new membership */ void adjustSuspectedMembers(List new_mbrship) { if(new_mbrship == null || new_mbrship.size() == 0) return; StringBuffer sb=new StringBuffer(); synchronized(suspected_mbrs) { sb.append("suspected_mbrs: ").append(suspected_mbrs); suspected_mbrs.retainAll(new_mbrship); if(suspected_mbrs.size() == 0) stopBroadcastTask(); sb.append(", after adjustment: ").append(suspected_mbrs); log.debug(sb.toString()); } } } protected final class BroadcastTask implements TimeScheduler.Task { boolean cancelled=false; private final Vector suspected_members=new Vector(); BroadcastTask(Vector suspected_members) { this.suspected_members.addAll(suspected_members); } public void stop() { cancelled=true; suspected_members.clear(); if(trace) log.trace("BroadcastTask stopped"); } public boolean cancelled() { return cancelled; } public long nextInterval() { return FD.this.timeout; } public void run() { Message suspect_msg; FD.FdHeader hdr; synchronized(suspected_members) { if(suspected_members.size() == 0) { stop(); if(log.isDebugEnabled()) log.debug("task done (no suspected members)"); return; } hdr=new FdHeader(FdHeader.SUSPECT); hdr.mbrs=(Vector)suspected_members.clone(); hdr.from=local_addr; } suspect_msg=new Message(); // mcast SUSPECT to all members suspect_msg.putHeader(name, hdr); if(log.isDebugEnabled()) log.debug("broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group"); passDown(new Event(Event.MSG, suspect_msg)); if(log.isDebugEnabled()) log.debug("task done"); } public void addSuspectedMember(Address suspect) { if(suspect != null && !suspected_members.contains(suspect)) { suspected_members.add(suspect); } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?