📄 fd_pid.java
字号:
passDown(evt); break; } } /* ----------------------------------- Private Methods -------------------------------------- */ /** * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_PIDS message * to coordinator and wait for GET_PIDS_RSP response. Loop until valid response has been received. */ void getPidsFromCoordinator() { Address coord; int attempts=num_tries; Message msg; FdHeader hdr; Hashtable result; get_pids_promise.reset(); while(attempts > 0) { if((coord=determineCoordinator()) != null) { if(coord.equals(local_addr)) { // we are the first member --> empty cache if(log.isInfoEnabled()) log.info("first member; cache is empty"); return; } hdr=new FdHeader(FdHeader.GET_PIDS); hdr.mbr=local_addr; msg=new Message(coord, null, null); msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, msg)); result=(Hashtable)get_pids_promise.getResult(get_pids_timeout); if(result != null) { pids.putAll(result); // replace all entries (there should be none !) in pids with the new values if(log.isInfoEnabled()) log.info("got cache from " + coord + ": cache is " + pids); return; } else { if(log.isErrorEnabled()) log.error("received null cache; retrying"); } } Util.sleep(get_pids_retry_timeout); --attempts; } } void broadcastSuspectMessage(Address suspected_mbr) { Message suspect_msg; FdHeader hdr; if(log.isInfoEnabled()) log.info("suspecting " + suspected_mbr + " (own address is " + local_addr + ')'); hdr=new FdHeader(FdHeader.SUSPECT); hdr.mbr=suspected_mbr; suspect_msg=new Message(); // mcast SUSPECT to all members suspect_msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, suspect_msg)); } void broadcastWhoHasPidMessage(Address mbr) { Message msg; FdHeader hdr; if(local_addr != null && mbr != null) if(log.isInfoEnabled()) log.info("[" + local_addr + "]: who-has " + mbr); msg=new Message(); // bcast msg hdr=new FdHeader(FdHeader.WHO_HAS_PID); hdr.mbr=mbr; msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, msg)); } /** * Sends or broadcasts a I_HAVE_PID response. If 'dst' is null, the reponse will be broadcast, otherwise * it will be unicast back to the requester */ void sendIHavePidMessage(Address dst, Address mbr, int pid) { Message msg=new Message(dst, null, null); FdHeader hdr=new FdHeader(FdHeader.I_HAVE_PID); hdr.mbr=mbr; hdr.pid=pid; msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, msg)); } /** * Set ping_dest and ping_pid. If ping_pid is not known, broadcast a WHO_HAS_PID message. */ Address determinePingDest() { Address tmp; if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null) return null; for(int i=0; i < pingable_mbrs.size(); i++) { tmp=(Address)pingable_mbrs.elementAt(i); if(local_addr.equals(tmp)) { if(i + 1 >= pingable_mbrs.size()) return (Address)pingable_mbrs.elementAt(0); else return (Address)pingable_mbrs.elementAt(i + 1); } } return null; } Address determineCoordinator() { return members.size() > 0 ? (Address)members.elementAt(0) : null; } /** * Checks whether 2 Addresses are on the same host */ boolean sameHost(Address one, Address two) { InetAddress a, b; String host_a, host_b; if(one == null || two == null) return false; if(!(one instanceof IpAddress) || !(two instanceof IpAddress)) { if(log.isErrorEnabled()) log.error("addresses have to be of type IpAddress to be compared"); return false; } a=((IpAddress)one).getIpAddress(); b=((IpAddress)two).getIpAddress(); if(a == null || b == null) return false; host_a=a.getHostAddress(); host_b=b.getHostAddress(); return host_a.equals(host_b); } /* ------------------------------- End of Private Methods ------------------------------------ */ public static class FdHeader extends Header { static final int SUSPECT=10; static final int WHO_HAS_PID=11; static final int I_HAVE_PID=12; static final int GET_PIDS=13; // sent by joining member to coordinator static final int GET_PIDS_RSP=14; // sent by coordinator to joining member in response to GET_PIDS int type=SUSPECT; Address mbr=null; // set on SUSPECT (suspected mbr), WHO_HAS_PID (requested mbr), I_HAVE_PID int pid=0; // set on I_HAVE_PID Hashtable pids=null; // set on GET_PIDS_RSP public FdHeader() { } // used for externalization FdHeader(int type) { this.type=type; } public String toString() { StringBuffer sb=new StringBuffer(); sb.append(type2String(type)); if(mbr != null) sb.append(", mbr=" + mbr); if(pid > 0) sb.append(", pid=" + pid); if(pids != null) sb.append(", pids=" + pids); return sb.toString(); } public static String type2String(int type) { switch(type) { case SUSPECT: return "SUSPECT"; case WHO_HAS_PID: return "WHO_HAS_PID"; case I_HAVE_PID: return "I_HAVE_PID"; case GET_PIDS: return "GET_PIDS"; case GET_PIDS_RSP: return "GET_PIDS_RSP"; default: return "unknown type (" + type + ')'; } } public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(type); out.writeObject(mbr); out.writeInt(pid); out.writeObject(pids); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readInt(); mbr=(Address)in.readObject(); pid=in.readInt(); pids=(Hashtable)in.readObject(); } } /** * An instance of this class will be added to the TimeScheduler to be scheduled to be run every timeout * msecs. When there is no ping_dest (e.g. only 1 member in the group), this task will be cancelled in * TimeScheduler (and re-scheduled if ping_dest becomes available later). */ private class Monitor implements TimeScheduler.Task { boolean started=true; void stop() { started=false; } /* -------------------------------------- TimeScheduler.Task Interface -------------------------------- */ public boolean cancelled() { return !started; } public long nextInterval() { return timeout; } /** * Periodically probe for the destination process identified by ping_dest/ping_pid. Suspect the ping_dest * member if /prop/<ping_pid> process does not exist. */ public void run() { if(ping_dest == null) { if(warn) log.warn("ping_dest is null, skipping ping"); return; } if(log.isInfoEnabled()) log.info("ping_dest=" + ping_dest + ", ping_pid=" + ping_pid + ", cache=" + pids); // If the PID for ping_dest is not known, send a broadcast to solicit it if(ping_pid <= 0) { if(ping_dest != null && pids.containsKey(ping_dest)) { ping_pid=((Integer)pids.get(ping_dest)).intValue(); if(log.isInfoEnabled()) log.info("found PID for " + ping_dest + " in cache (pid=" + ping_pid + ')'); } else { if(log.isErrorEnabled()) log.error("PID for " + ping_dest + " not known" + ", cache is " + pids); broadcastWhoHasPidMessage(ping_dest); return; } } if(!Util.fileExists("/proc/" + ping_pid)) { if(log.isInfoEnabled()) log.info("process " + ping_pid + " does not exist"); broadcastSuspectMessage(ping_dest); pingable_mbrs.removeElement(ping_dest); ping_dest=determinePingDest(); if(ping_dest == null) stop(); ping_pid=0; } else { if(log.isInfoEnabled()) log.info(ping_dest + " is alive"); } } /* ---------------------------------- End of TimeScheduler.Task Interface ---------------------------- */ }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -