⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiplexer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                service_state_promise.setResult(info.state);
                break;
            case ServiceInfo.SERVICE_UP:
                handleServiceUp(info.service, info.host, true);
                break;
            case ServiceInfo.SERVICE_DOWN:
                handleServiceDown(info.service, info.host, true);
                break;
            case ServiceInfo.LIST_SERVICES_RSP:
                handleServicesRsp(sender, info.state);
                break;
            default:
                if(log.isErrorEnabled())
                    log.error("service request type " + info.type + " not known");
                break;
        }
    }

    private void handleServicesRsp(Address sender, byte[] state) throws Exception {
        Object[] keys=(Object[])Util.objectFromByteBuffer(state);
        Set s=new HashSet();
        for(int i=0; i < keys.length; i++)
            s.add(keys[i]);



        synchronized(service_responses) {
            Set tmp=(Set)service_responses.get(sender);
            if(tmp == null)
                tmp=new HashSet();
            tmp.addAll(s);

            service_responses.put(sender, tmp);
            if(log.isTraceEnabled())
                log.trace("received service response: " + sender + "(" + s.toString() + ")");
            service_responses.notifyAll();
        }
    }


    private void handleServiceDown(String service, Address host, boolean received) {
        List    hosts, hosts_copy;
        boolean removed=false;

        // discard if we sent this message
        if(received && host != null && local_addr != null && local_addr.equals(host)) {
            return;
        }

        synchronized(service_state) {
            hosts=(List)service_state.get(service);
            if(hosts == null)
                return;
            removed=hosts.remove(host);
            hosts_copy=new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
        }

        if(removed) {
            View service_view=generateServiceView(hosts_copy);
            if(service_view != null) {
                MuxChannel ch=(MuxChannel)services.get(service);
                if(ch != null) {
                    Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
                    ch.up(view_evt);
                }
                else {
                    if(log.isTraceEnabled())
                        log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
                }
            }
        }

        Address local_address=getLocalAddress();
        if(local_address != null && host != null && host.equals(local_address))
            unregister(service);
    }


    private void handleServiceUp(String service, Address host, boolean received) {
        List    hosts, hosts_copy;
        boolean added=false;

        // discard if we sent this message
        if(received && host != null && local_addr != null && local_addr.equals(host)) {
            return;
        }

        synchronized(service_state) {
            hosts=(List)service_state.get(service);
            if(hosts == null) {
                hosts=new ArrayList();
                service_state.put(service,  hosts);
            }
            if(!hosts.contains(host)) {
                hosts.add(host);
                added=true;
            }
            hosts_copy=new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
        }

        if(added) {
            View service_view=generateServiceView(hosts_copy);
            if(service_view != null) {
                MuxChannel ch=(MuxChannel)services.get(service);
                if(ch != null) {
                    Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
                    ch.up(view_evt);
                }
                else {
                    if(log.isTraceEnabled())
                        log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
                }
            }
        }
    }


    /**
     * Fetches the service states from everyone else in the cluster. Once all states have been received and inserted into
     * service_state, compute a service view (a copy of MergeView) for each service and pass it up
     * @param view
     */
    private void handleMergeView(MergeView view) throws Exception {
        long time_to_wait=SERVICES_RSP_TIMEOUT, start;
        int num_members=view.size(); // include myself
        Map copy=null;

        sendServiceState();

        synchronized(service_responses) {
            start=System.currentTimeMillis();
            try {
                while(time_to_wait > 0 && numResponses(service_responses) < num_members) {
                    // System.out.println("time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
                       //     ", num_members=" + num_members + ", service_state=" + service_state);
                    service_responses.wait(time_to_wait);
                    time_to_wait-=System.currentTimeMillis() - start;
                }

                // System.out.println("wait terminated: time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
                   //     ", num_members=" + num_members + ", service_state=" + service_state);
                copy=new HashMap(service_responses);
            }
            catch(Exception ex) {
                if(log.isErrorEnabled())
                    log.error("failed fetching a list of services from other members in the cluster, cannot handle merge view " + view, ex);
            }
        }

        if(log.isTraceEnabled())
            log.trace("merging service state, my service_state: " + service_state + ", received responses: " + copy);

        // merges service_responses with service_state and emits MergeViews for the services affected (MuxChannel)
        mergeServiceState(view, copy);
        service_responses.clear();
        temp_merge_view=null;
    }

    private int numResponses(Map m) {
        int num=0;
        Collection values=m.values();
        for(Iterator it=values.iterator(); it.hasNext();) {
            if(it.next() != null)
                num++;
        }

        return num;
    }


    private void mergeServiceState(MergeView view, Map copy) {
        Set modified_services=new HashSet();
        Map.Entry entry;
        Address host;     // address of the sender
        Set service_list; // Set<String> of services
        List my_services;
        String service;

        synchronized(service_state) {
            for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
                entry=(Map.Entry)it.next();
                host=(Address)entry.getKey();
                service_list=(Set)entry.getValue();
                if(service_list == null)
                    continue;

                for(Iterator it2=service_list.iterator(); it2.hasNext();) {
                    service=(String)it2.next();
                    my_services=(List)service_state.get(service);
                    if(my_services == null) {
                        my_services=new ArrayList();
                        service_state.put(service, my_services);
                    }

                    boolean was_modified=my_services.add(host);
                    if(was_modified) {
                        modified_services.add(service);
                    }
                }
            }
        }

        // now emit MergeViews for all services which were modified
        for(Iterator it=modified_services.iterator(); it.hasNext();) {
            service=(String)it.next();
            MuxChannel ch=(MuxChannel)services.get(service);
            my_services=(List)service_state.get(service);
            MergeView v=(MergeView)view.clone();
            v.getMembers().retainAll(my_services);
            Event evt=new Event(Event.VIEW_CHANGE, v);
            ch.up(evt);
        }
    }

    private void adjustServiceViews(Vector left_members) {
        if(left_members != null)
            for(int i=0; i < left_members.size(); i++) {
                try {
                    adjustServiceView((Address)left_members.elementAt(i));
                }
                catch(Throwable t) {
                    if(log.isErrorEnabled())
                        log.error("failed adjusting service views", t);
                }
            }
    }

    private void adjustServiceView(Address host) {
        Map.Entry entry;
        List hosts, hosts_copy;
        String service;
        boolean removed=false;

        synchronized(service_state) {
            for(Iterator it=service_state.entrySet().iterator(); it.hasNext();) {
                entry=(Map.Entry)it.next();
                service=(String)entry.getKey();
                hosts=(List)entry.getValue();
                if(hosts == null)
                    continue;

                removed=hosts.remove(host);
                hosts_copy=new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()

                if(removed) {
                    View service_view=generateServiceView(hosts_copy);
                    if(service_view != null) {
                        MuxChannel ch=(MuxChannel)services.get(service);
                        if(ch != null) {
                            Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
                            ch.up(view_evt);
                        }
                        else {
                            if(log.isTraceEnabled())
                                log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
                        }
                    }
                }
                Address local_address=getLocalAddress();
                if(local_address != null && host != null && host.equals(local_address))
                    unregister(service);
            }
        }
    }


    /**
     * Create a copy of view which contains only members which are present in hosts. Call viewAccepted() on the MuxChannel
     * which corresponds with service. If no members are removed or added from/to view, this is a no-op.
     * @param hosts List<Address>
     * @return the servicd view (a modified copy of the real view), or null if the view was not modified
     */
    private View generateServiceView(List hosts) {
        View copy=new View(view.getVid(), new Vector(view.getMembers()));
        Vector members=copy.getMembers();
        members.retainAll(hosts);
        return copy;
    }


    /** Tell the underlying channel to start the flush protocol, this will be handled by FLUSH */
    private void startFlush() {
        channel.down(new Event(Event.SUSPEND));
    }

    /** Tell the underlying channel to stop the flush, and resume message sending. This will be handled by FLUSH */
    private void stopFlush() {
        channel.down(new Event(Event.RESUME));
    }


    public void addServiceIfNotPresent(String id, MuxChannel ch) {
        MuxChannel tmp;
        synchronized(services) {
            tmp=(MuxChannel)services.get(id);
            if(tmp == null) {
                services.put(id, ch);
            }
        }
    }


    private static class BlockOkCollector {
        int num_block_oks=0;

        synchronized void reset() {
            num_block_oks=0;
        }

        synchronized void increment() {
            num_block_oks++;
        }

        synchronized void waitUntil(int num) {
            while(num_block_oks < num) {
                try {
                    this.wait();
                }
                catch(InterruptedException e) {
                }
            }
        }

        public String toString() {
            return String.valueOf(num_block_oks);
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -