⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gms.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    public static class Request {        static final int JOIN    = 1;        static final int LEAVE   = 2;        static final int SUSPECT = 3;        static final int MERGE   = 4;        static final int VIEW    = 5;        int     type=-1;        Address mbr;        boolean suspected;        Vector  coordinators;        View    view;        Digest  digest;        List    target_members;        Request(int type) {            this.type=type;        }        Request(int type, Address mbr, boolean suspected, Vector coordinators) {            this.type=type;            this.mbr=mbr;            this.suspected=suspected;            this.coordinators=coordinators;        }        public int getType() {            return type;        }        public String toString() {            switch(type) {                case JOIN:    return "JOIN(" + mbr + ")";                case LEAVE:   return "LEAVE(" + mbr + ", " + suspected + ")";                case SUSPECT: return "SUSPECT(" + mbr + ")";                case MERGE:   return "MERGE(" + coordinators + ")";                case VIEW:    return "VIEW (" + view.getVid() + ")";            }            return "<invalid (type=" + type + ")";        }        /**         * Specifies whether this request can be processed with other request simultaneously         */        public boolean canBeProcessedTogether(Request other) {            if(other == null)                return false;            int other_type=other.getType();            return (type == JOIN || type == LEAVE || type == SUSPECT) &&                    (other_type == JOIN || other_type == LEAVE || other_type == SUSPECT);        }    }    /**     * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order     * @author Bela Ban     * @version $Id: GMS.java,v 1.68 2006/10/30 11:19:20 belaban Exp $     */    class ViewHandler implements Runnable {        Thread                    thread;        Queue                     q=new Queue(); // Queue<Request>        boolean                   suspended=false;        final static long         INTERVAL=5000;        private static final long MAX_COMPLETION_TIME=10000;        /** Maintains a list of the last 20 requests */        private final BoundedList history=new BoundedList(20);        /** Map<Object,TimeScheduler.CancellableTask>. Keeps track of Resumer tasks which have not fired yet */        private final Map         resume_tasks=new HashMap();        private Object            merge_id=null;        void add(Request req) {            add(req, false, false);        }        synchronized void add(Request req, boolean at_head, boolean unsuspend) {            if(suspended && !unsuspend) {                log.warn("queue is suspended; request " + req + " is discarded");                return;            }            start(unsuspend);            try {                if(at_head)                    q.addAtHead(req);                else                    q.add(req);                history.add(new Date() + ": " + req.toString());            }            catch(QueueClosedException e) {                if(trace)                    log.trace("queue is closed; request " + req + " is discarded");            }        }        void waitUntilCompleted(long timeout) {            waitUntilCompleted(timeout, false);        }        synchronized void waitUntilCompleted(long timeout, boolean resume) {            if(thread != null) {                try {                    thread.join(timeout);                }                catch(InterruptedException e) {                }            }            if(resume)                resumeForce();        }        /**         * Waits until the current request has been processes, then clears the queue and discards new         * requests from now on         */        public synchronized void suspend(Object merge_id) {            if(suspended)                return;            suspended=true;            this.merge_id=merge_id;            q.clear();            waitUntilCompleted(MAX_COMPLETION_TIME);            q.close(true);            if(trace)                log.trace("suspended ViewHandler");            Resumer r=new Resumer(resume_task_timeout, merge_id, resume_tasks, this);            resume_tasks.put(merge_id, r);            timer.add(r);        }        public synchronized void resume(Object merge_id) {            if(!suspended)                return;            boolean same_merge_id=this.merge_id != null && merge_id != null && this.merge_id.equals(merge_id);            same_merge_id=same_merge_id || (this.merge_id == null && merge_id == null);            if(!same_merge_id) {                if(warn)                    log.warn("resume(" +merge_id+ ") does not match " + this.merge_id + ", ignoring resume()");                return;            }            synchronized(resume_tasks) {                TimeScheduler.CancellableTask task=(TimeScheduler.CancellableTask)resume_tasks.get(merge_id);                if(task != null) {                    task.cancel();                    resume_tasks.remove(merge_id);                }            }            resumeForce();        }        public synchronized void resumeForce() {            if(q.closed())                q.reset();            suspended=false;            if(trace)                log.trace("resumed ViewHandler");        }        public void run() {            long start, stop, wait_time;            List requests=new LinkedList();            while(!q.closed() && Thread.currentThread().equals(thread)) {                requests.clear();                try {                    boolean keepGoing=false;                    start=System.currentTimeMillis();                    do {                        Request firstRequest=(Request)q.remove(INTERVAL); // throws a TimeoutException if it runs into timeout                        requests.add(firstRequest);                        if(q.size() > 0) {                            Request nextReq=(Request)q.peek();                            keepGoing=view_bundling && firstRequest.canBeProcessedTogether(nextReq);                        }                        else {                            stop=System.currentTimeMillis();                            wait_time=max_bundling_time - (stop-start);                            if(wait_time > 0)                                Util.sleep(wait_time);                            keepGoing=q.size() > 0;                        }                    }                    while(keepGoing);                    process(requests);                }                catch(QueueClosedException e) {                    break;                }                catch(TimeoutException e) {                    break;                }                catch(Throwable catchall) {                    Util.sleep(50);                }            }        }        public int size() {return q.size();}        public boolean suspended() {return suspended;}        public String dumpQueue() {            StringBuffer sb=new StringBuffer();            List v=q.values();            for(Iterator it=v.iterator(); it.hasNext();) {                sb.append(it.next() + "\n");            }            return sb.toString();        }        public String dumpHistory() {            StringBuffer sb=new StringBuffer();            for(Enumeration en=history.elements(); en.hasMoreElements();) {                sb.append(en.nextElement() + "\n");            }            return sb.toString();        }        private void process(List requests) {            if(requests.isEmpty())                return;            if(trace)                log.trace("processing " + requests);            Request firstReq=(Request)requests.get(0);            switch(firstReq.type) {                case Request.JOIN:                case Request.LEAVE:                case Request.SUSPECT:                    Collection newMembers=new LinkedHashSet(requests.size());                    Collection suspectedMembers=new LinkedHashSet(requests.size());                    Collection oldMembers=new LinkedHashSet(requests.size());                    for(Iterator i=requests.iterator(); i.hasNext();) {                        Request req=(Request)i.next();                        switch(req.type) {                            case Request.JOIN:                                newMembers.add(req.mbr);                                break;                            case Request.LEAVE:                                if(req.suspected)                                    suspectedMembers.add(req.mbr);                                else                                    oldMembers.add(req.mbr);                                break;                            case Request.SUSPECT:                                suspectedMembers.add(req.mbr);                                break;                        }                    }                    impl.handleMembershipChange(newMembers, oldMembers, suspectedMembers);                    break;                case Request.MERGE:                    if(requests.size() > 1)                        log.error("more than one MERGE request to process, ignoring the others");                    impl.merge(firstReq.coordinators);                    break;                case Request.VIEW:                    if(requests.size() > 1)                        log.error("more than one VIEW request to process, ignoring the others");                    try {                        if(use_flush)                            startFlush(firstReq.view);                        castViewChangeWithDest(firstReq.view, firstReq.digest, firstReq.target_members);                    }                    finally {                        if(use_flush)                            stopFlush();                    }                    break;                default:                    log.error("request " + firstReq.type + " is unknown; discarded");            }        }        synchronized void start(boolean unsuspend) {            if(q.closed())                q.reset();            if(unsuspend) {                suspended=false;                synchronized(resume_tasks) {                    TimeScheduler.CancellableTask task=(TimeScheduler.CancellableTask)resume_tasks.get(merge_id);                    if(task != null) {                        task.cancel();                        resume_tasks.remove(merge_id);                    }                }            }            merge_id=null;            if(thread == null || !thread.isAlive()) {                thread=new Thread(Util.getGlobalThreadGroup(), this, "ViewHandler");                thread.setDaemon(false); // thread cannot terminate if we have tasks left, e.g. when we as coord leave                thread.start();                if(trace)                    log.trace("ViewHandler started");            }        }        synchronized void stop(boolean flush) {            q.close(flush);            TimeScheduler.CancellableTask task;            synchronized(resume_tasks) {                for(Iterator it=resume_tasks.values().iterator(); it.hasNext();) {                    task=(TimeScheduler.CancellableTask)it.next();                    task.cancel();                }                resume_tasks.clear();            }            merge_id=null;            // resumeForce();        }    }    /**     * Resumer is a second line of defense: when the ViewHandler is suspended, it will be resumed when the current     * merge is cancelled, or when the merge completes. However, in a case where this never happens (this     * shouldn't be the case !), the Resumer will nevertheless resume the ViewHandler.     * We chose this strategy because ViewHandler is critical: if it is suspended indefinitely, we would     * not be able to process new JOIN requests ! So, this is for peace of mind, although it most likely     * will never be used...     */    static class Resumer implements TimeScheduler.CancellableTask {        boolean           cancelled=false;        long              interval;        final Object      token;        final Map         tasks;        final ViewHandler handler;        public Resumer(long interval, final Object token, final Map t, final ViewHandler handler) {            this.interval=interval;            this.token=token;            this.tasks=t;            this.handler=handler;        }        public void cancel() {            cancelled=true;        }        public boolean cancelled() {            return cancelled;        }        public long nextInterval() {            return interval;        }        public void run() {            TimeScheduler.CancellableTask t;            boolean execute=true;            synchronized(tasks) {                t=(TimeScheduler.CancellableTask)tasks.get(token);                if(t != null) {                    t.cancel();                    execute=true;                }                else {                    execute=false;                }                tasks.remove(token);            }            if(execute) {                handler.resume(token);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -