📄 gms.java
字号:
public static class Request { static final int JOIN = 1; static final int LEAVE = 2; static final int SUSPECT = 3; static final int MERGE = 4; static final int VIEW = 5; int type=-1; Address mbr; boolean suspected; Vector coordinators; View view; Digest digest; List target_members; Request(int type) { this.type=type; } Request(int type, Address mbr, boolean suspected, Vector coordinators) { this.type=type; this.mbr=mbr; this.suspected=suspected; this.coordinators=coordinators; } public int getType() { return type; } public String toString() { switch(type) { case JOIN: return "JOIN(" + mbr + ")"; case LEAVE: return "LEAVE(" + mbr + ", " + suspected + ")"; case SUSPECT: return "SUSPECT(" + mbr + ")"; case MERGE: return "MERGE(" + coordinators + ")"; case VIEW: return "VIEW (" + view.getVid() + ")"; } return "<invalid (type=" + type + ")"; } /** * Specifies whether this request can be processed with other request simultaneously */ public boolean canBeProcessedTogether(Request other) { if(other == null) return false; int other_type=other.getType(); return (type == JOIN || type == LEAVE || type == SUSPECT) && (other_type == JOIN || other_type == LEAVE || other_type == SUSPECT); } } /** * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order * @author Bela Ban * @version $Id: GMS.java,v 1.68 2006/10/30 11:19:20 belaban Exp $ */ class ViewHandler implements Runnable { Thread thread; Queue q=new Queue(); // Queue<Request> boolean suspended=false; final static long INTERVAL=5000; private static final long MAX_COMPLETION_TIME=10000; /** Maintains a list of the last 20 requests */ private final BoundedList history=new BoundedList(20); /** Map<Object,TimeScheduler.CancellableTask>. Keeps track of Resumer tasks which have not fired yet */ private final Map resume_tasks=new HashMap(); private Object merge_id=null; void add(Request req) { add(req, false, false); } synchronized void add(Request req, boolean at_head, boolean unsuspend) { if(suspended && !unsuspend) { log.warn("queue is suspended; request " + req + " is discarded"); return; } start(unsuspend); try { if(at_head) q.addAtHead(req); else q.add(req); history.add(new Date() + ": " + req.toString()); } catch(QueueClosedException e) { if(trace) log.trace("queue is closed; request " + req + " is discarded"); } } void waitUntilCompleted(long timeout) { waitUntilCompleted(timeout, false); } synchronized void waitUntilCompleted(long timeout, boolean resume) { if(thread != null) { try { thread.join(timeout); } catch(InterruptedException e) { } } if(resume) resumeForce(); } /** * Waits until the current request has been processes, then clears the queue and discards new * requests from now on */ public synchronized void suspend(Object merge_id) { if(suspended) return; suspended=true; this.merge_id=merge_id; q.clear(); waitUntilCompleted(MAX_COMPLETION_TIME); q.close(true); if(trace) log.trace("suspended ViewHandler"); Resumer r=new Resumer(resume_task_timeout, merge_id, resume_tasks, this); resume_tasks.put(merge_id, r); timer.add(r); } public synchronized void resume(Object merge_id) { if(!suspended) return; boolean same_merge_id=this.merge_id != null && merge_id != null && this.merge_id.equals(merge_id); same_merge_id=same_merge_id || (this.merge_id == null && merge_id == null); if(!same_merge_id) { if(warn) log.warn("resume(" +merge_id+ ") does not match " + this.merge_id + ", ignoring resume()"); return; } synchronized(resume_tasks) { TimeScheduler.CancellableTask task=(TimeScheduler.CancellableTask)resume_tasks.get(merge_id); if(task != null) { task.cancel(); resume_tasks.remove(merge_id); } } resumeForce(); } public synchronized void resumeForce() { if(q.closed()) q.reset(); suspended=false; if(trace) log.trace("resumed ViewHandler"); } public void run() { long start, stop, wait_time; List requests=new LinkedList(); while(!q.closed() && Thread.currentThread().equals(thread)) { requests.clear(); try { boolean keepGoing=false; start=System.currentTimeMillis(); do { Request firstRequest=(Request)q.remove(INTERVAL); // throws a TimeoutException if it runs into timeout requests.add(firstRequest); if(q.size() > 0) { Request nextReq=(Request)q.peek(); keepGoing=view_bundling && firstRequest.canBeProcessedTogether(nextReq); } else { stop=System.currentTimeMillis(); wait_time=max_bundling_time - (stop-start); if(wait_time > 0) Util.sleep(wait_time); keepGoing=q.size() > 0; } } while(keepGoing); process(requests); } catch(QueueClosedException e) { break; } catch(TimeoutException e) { break; } catch(Throwable catchall) { Util.sleep(50); } } } public int size() {return q.size();} public boolean suspended() {return suspended;} public String dumpQueue() { StringBuffer sb=new StringBuffer(); List v=q.values(); for(Iterator it=v.iterator(); it.hasNext();) { sb.append(it.next() + "\n"); } return sb.toString(); } public String dumpHistory() { StringBuffer sb=new StringBuffer(); for(Enumeration en=history.elements(); en.hasMoreElements();) { sb.append(en.nextElement() + "\n"); } return sb.toString(); } private void process(List requests) { if(requests.isEmpty()) return; if(trace) log.trace("processing " + requests); Request firstReq=(Request)requests.get(0); switch(firstReq.type) { case Request.JOIN: case Request.LEAVE: case Request.SUSPECT: Collection newMembers=new LinkedHashSet(requests.size()); Collection suspectedMembers=new LinkedHashSet(requests.size()); Collection oldMembers=new LinkedHashSet(requests.size()); for(Iterator i=requests.iterator(); i.hasNext();) { Request req=(Request)i.next(); switch(req.type) { case Request.JOIN: newMembers.add(req.mbr); break; case Request.LEAVE: if(req.suspected) suspectedMembers.add(req.mbr); else oldMembers.add(req.mbr); break; case Request.SUSPECT: suspectedMembers.add(req.mbr); break; } } impl.handleMembershipChange(newMembers, oldMembers, suspectedMembers); break; case Request.MERGE: if(requests.size() > 1) log.error("more than one MERGE request to process, ignoring the others"); impl.merge(firstReq.coordinators); break; case Request.VIEW: if(requests.size() > 1) log.error("more than one VIEW request to process, ignoring the others"); try { if(use_flush) startFlush(firstReq.view); castViewChangeWithDest(firstReq.view, firstReq.digest, firstReq.target_members); } finally { if(use_flush) stopFlush(); } break; default: log.error("request " + firstReq.type + " is unknown; discarded"); } } synchronized void start(boolean unsuspend) { if(q.closed()) q.reset(); if(unsuspend) { suspended=false; synchronized(resume_tasks) { TimeScheduler.CancellableTask task=(TimeScheduler.CancellableTask)resume_tasks.get(merge_id); if(task != null) { task.cancel(); resume_tasks.remove(merge_id); } } } merge_id=null; if(thread == null || !thread.isAlive()) { thread=new Thread(Util.getGlobalThreadGroup(), this, "ViewHandler"); thread.setDaemon(false); // thread cannot terminate if we have tasks left, e.g. when we as coord leave thread.start(); if(trace) log.trace("ViewHandler started"); } } synchronized void stop(boolean flush) { q.close(flush); TimeScheduler.CancellableTask task; synchronized(resume_tasks) { for(Iterator it=resume_tasks.values().iterator(); it.hasNext();) { task=(TimeScheduler.CancellableTask)it.next(); task.cancel(); } resume_tasks.clear(); } merge_id=null; // resumeForce(); } } /** * Resumer is a second line of defense: when the ViewHandler is suspended, it will be resumed when the current * merge is cancelled, or when the merge completes. However, in a case where this never happens (this * shouldn't be the case !), the Resumer will nevertheless resume the ViewHandler. * We chose this strategy because ViewHandler is critical: if it is suspended indefinitely, we would * not be able to process new JOIN requests ! So, this is for peace of mind, although it most likely * will never be used... */ static class Resumer implements TimeScheduler.CancellableTask { boolean cancelled=false; long interval; final Object token; final Map tasks; final ViewHandler handler; public Resumer(long interval, final Object token, final Map t, final ViewHandler handler) { this.interval=interval; this.token=token; this.tasks=t; this.handler=handler; } public void cancel() { cancelled=true; } public boolean cancelled() { return cancelled; } public long nextInterval() { return interval; } public void run() { TimeScheduler.CancellableTask t; boolean execute=true; synchronized(tasks) { t=(TimeScheduler.CancellableTask)tasks.get(token); if(t != null) { t.cancel(); execute=true; } else { execute=false; } tasks.remove(token); } if(execute) { handler.resume(token); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -