⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiplexer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:

            case Event.GET_APPLSTATE:
            case Event.STATE_TRANSFER_OUTPUTSTREAM:
                handleStateRequest(evt);
                break;

            case Event.GET_STATE_OK:
            case Event.STATE_TRANSFER_INPUTSTREAM:
                handleStateResponse(evt);
                break;

            case Event.SET_LOCAL_ADDRESS:
                local_addr=(Address)evt.getArg();
                passToAllMuxChannels(evt);
                break;

            case Event.BLOCK:
                blocked=true;
                int num_services=services.size();
                if(num_services == 0) {
                    channel.blockOk();
                    return;
                }
                block_ok_collector.reset();
                passToAllMuxChannels(evt);
                block_ok_collector.waitUntil(num_services);
                channel.blockOk();
                return;

            case Event.UNBLOCK: // process queued-up MergeViews
                if(!blocked) {
                    passToAllMuxChannels(evt);
                    return;
                }
                else
                    blocked=false;
                if(temp_merge_view != null) {
                    if(log.isTraceEnabled())
                        log.trace("calling handleMergeView() from UNBLOCK (flush_present=" + flush_present + ")");
                    try {
                        Thread merge_handler=new Thread() {
                            public void run() {
                                try {
                                    handleMergeView(temp_merge_view);
                                }
                                catch(Exception e) {
                                    if(log.isErrorEnabled())
                                        log.error("problems handling merge view", e);
                                }
                            }
                        };
                        merge_handler.setName("merge handler (unblock)");
                        merge_handler.setDaemon(false);
                        merge_handler.start();
                    }
                    catch(Exception e) {
                        if(log.isErrorEnabled())
                            log.error("failed handling merge view", e);
                    }
                }
                passToAllMuxChannels(evt);
                break;

            default:
                passToAllMuxChannels(evt);
                break;
        }
    }




    public Channel createMuxChannel(JChannelFactory f, String id, String stack_name) throws Exception {
        MuxChannel ch;
        synchronized(services) {
            if(services.containsKey(id))
                throw new Exception("service ID \"" + id + "\" is already registered, cannot register duplicate ID");
            ch=new MuxChannel(f, channel, id, stack_name, this);
            services.put(id, ch);
        }
        return ch;
    }




    private void passToAllMuxChannels(Event evt) {
        for(Iterator it=services.values().iterator(); it.hasNext();) {
            MuxChannel ch=(MuxChannel)it.next();
            ch.up(evt);
        }
    }

    public MuxChannel remove(String id) {
        synchronized(services) {
            return (MuxChannel)services.remove(id);
        }
    }



    /** Closes the underlying JChannel if all MuxChannels have been disconnected */
    public void disconnect() {
        MuxChannel mux_ch;
        boolean all_disconnected=true;
        synchronized(services) {
            for(Iterator it=services.values().iterator(); it.hasNext();) {
                mux_ch=(MuxChannel)it.next();
                if(mux_ch.isConnected()) {
                    all_disconnected=false;
                    break;
                }
            }
            if(all_disconnected) {
                if(log.isTraceEnabled()) {
                    log.trace("disconnecting underlying JChannel as all MuxChannels are disconnected");
                }
                channel.disconnect();
            }
        }
    }


    public void unregister(String appl_id) {
        synchronized(services) {
            services.remove(appl_id);
        }
    }

    public boolean close() {
        MuxChannel mux_ch;
        boolean all_closed=true;
        synchronized(services) {
            for(Iterator it=services.values().iterator(); it.hasNext();) {
                mux_ch=(MuxChannel)it.next();
                if(mux_ch.isOpen()) {
                    all_closed=false;
                    break;
                }
            }
            if(all_closed) {
                if(log.isTraceEnabled()) {
                    log.trace("closing underlying JChannel as all MuxChannels are closed");
                }
                channel.close();
                services.clear();
            }
            return all_closed;
        }
    }

    public void closeAll() {
        synchronized(services) {
            MuxChannel mux_ch;
            for(Iterator it=services.values().iterator(); it.hasNext();) {
                mux_ch=(MuxChannel)it.next();
                mux_ch.setConnected(false);
                mux_ch.setClosed(true);
                mux_ch.closeMessageQueue(true);
            }
        }
    }

    public boolean shutdown() {
        MuxChannel mux_ch;
        boolean all_closed=true;
        synchronized(services) {
            for(Iterator it=services.values().iterator(); it.hasNext();) {
                mux_ch=(MuxChannel)it.next();
                if(mux_ch.isOpen()) {
                    all_closed=false;
                    break;
                }
            }
            if(all_closed) {
                if(log.isTraceEnabled()) {
                    log.trace("shutting down underlying JChannel as all MuxChannels are closed");
                }
                channel.shutdown();
                services.clear();
            }
            return all_closed;
        }
    }


    private boolean isFlushPresent() {
        return channel.getProtocolStack().findProtocol("FLUSH") != null;
    }


    private void sendServiceState() throws Exception {
        Object[] my_services=services.keySet().toArray();
        byte[] data=Util.objectToByteBuffer(my_services);
        ServiceInfo sinfo=new ServiceInfo(ServiceInfo.LIST_SERVICES_RSP, null, channel.getLocalAddress(), data);
        Message rsp=new Message(); // send to everyone
        MuxHeader hdr=new MuxHeader(sinfo);
        rsp.putHeader(NAME, hdr);
        channel.send(rsp);
    }


    private Address getLocalAddress() {
        if(local_addr != null)
            return local_addr;
        if(channel != null)
            local_addr=channel.getLocalAddress();
        return local_addr;
    }

    private Address getCoordinator() {
        if(channel != null) {
            View v=channel.getView();
            if(v != null) {
                Vector members=v.getMembers();
                if(members != null && members.size() > 0) {
                    return (Address)members.firstElement();
                }
            }
        }
        return null;
    }

    public Address getServiceCoordinator(String service_id) {
        List hosts=(List)service_state.get(service_id);
        if(hosts == null || hosts.size() == 0)
            return null;
        return (Address)hosts.get(0);
    }

    private void sendServiceMessage(byte type, String service, Address host) throws Exception {
        if(host == null)
            host=getLocalAddress();
        if(host == null) {
            if(log.isWarnEnabled()) {
                log.warn("local_addr is null, cannot send ServiceInfo." + ServiceInfo.typeToString(type) + " message");
            }
            return;
        }

        ServiceInfo si=new ServiceInfo(type, service, host, null);
        MuxHeader hdr=new MuxHeader(si);
        Message service_msg=new Message();
        service_msg.putHeader(NAME, hdr);
        channel.send(service_msg);
    }


    private void handleStateRequest(Event evt) {
        StateTransferInfo info=(StateTransferInfo)evt.getArg();
        String id=info.state_id;
        String original_id=id;
        MuxChannel mux_ch=null;

        try {
            int index=id.indexOf(SEPARATOR);
            if(index > -1) {
                info.state_id=id.substring(index + SEPARATOR_LEN);
                id=id.substring(0, index);  // similar reuse as above...
            }
            else {
                info.state_id=null;
            }

            mux_ch=(MuxChannel)services.get(id);
            if(mux_ch == null)
                throw new IllegalArgumentException("didn't find service with ID=" + id + " to fetch state from");

            // evt.setArg(info);
            mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
        }
        catch(Throwable ex) {
            if(log.isErrorEnabled())
                log.error("failed returning the application state, will return null", ex);
            channel.returnState(null, original_id); // we cannot use mux_ch because it might be null due to the lookup above
        }
    }




    private void handleStateResponse(Event evt) {
        StateTransferInfo info=(StateTransferInfo)evt.getArg();
        MuxChannel mux_ch;

        String appl_id, substate_id, tmp;
        tmp=info.state_id;

        if(tmp == null) {
            if(log.isTraceEnabled())
                log.trace("state is null, not passing up: " + info);
            return;
        }

        int index=tmp.indexOf(SEPARATOR);
        if(index > -1) {
            appl_id=tmp.substring(0, index);
            substate_id=tmp.substring(index+SEPARATOR_LEN);
        }
        else {
            appl_id=tmp;
            substate_id=null;
        }

        mux_ch=(MuxChannel)services.get(appl_id);
        if(mux_ch == null) {
            log.error("didn't find service with ID=" + appl_id + " to fetch state from");
        }
        else {
            StateTransferInfo tmp_info=info.copy();
            tmp_info.state_id=substate_id;
            evt.setArg(tmp_info);
            mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
        }
    }

    private void handleServiceStateRequest(ServiceInfo info, Address sender) throws Exception {
        switch(info.type) {
            case ServiceInfo.STATE_REQ:
                byte[] state;
                synchronized(service_state) {
                    state=Util.objectToByteBuffer(service_state);
                }
                ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_RSP, null, null, state);
                MuxHeader hdr=new MuxHeader(si);
                Message state_rsp=new Message(sender);
                state_rsp.putHeader(NAME, hdr);
                channel.send(state_rsp);
                break;
            case ServiceInfo.STATE_RSP:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -