jchannelfactory.java

来自「JGRoups源码」· Java 代码 · 共 640 行 · 第 1/2 页

JAVA
640
字号
                if(entry.multiplexer != null)                    entry.multiplexer.addServiceIfNotPresent(ch.getId(), ch);                                if(!entry.channel.isConnected()) {                    entry.channel.connect(ch.getStackName());                    if(entry.multiplexer != null) {                        try {                            entry.multiplexer.fetchServiceInformation();                        }                        catch(Exception e) {                            if(log.isErrorEnabled())                                log.error("failed fetching service state", e);                        }                    }                }                if(entry.multiplexer != null) {                    try {                        Address addr=entry.channel.getLocalAddress();                        entry.multiplexer.sendServiceUpMessage(ch.getId(), addr);                    }                    catch(Exception e) {                        if(log.isErrorEnabled())                            log.error("failed sending SERVICE_UP message", e);                    }                }            }        }        ch.setClosed(false);        ch.setConnected(true);    }    public void disconnect(MuxChannel ch) {        Entry entry;        synchronized(channels) {            entry=(Entry)channels.get(ch.getStackName());        }        if(entry != null) {            synchronized(entry) {                Multiplexer mux=entry.multiplexer;                if(mux != null) {                    Address addr=entry.channel.getLocalAddress();                    try {                        mux.sendServiceDownMessage(ch.getId(), addr);                    }                    catch(Exception e) {                        if(log.isErrorEnabled())                            log.error("failed sending SERVICE_DOWN message", e);                    }                    mux.disconnect(); // disconnects JChannel if all MuxChannels are in disconnected state                }            }        }    }    public void close(MuxChannel ch) {        Entry entry;        String stack_name=ch.getStackName();        boolean all_closed=false;        synchronized(channels) {            entry=(Entry)channels.get(stack_name);        }        if(entry != null) {            synchronized(entry) {                Multiplexer mux=entry.multiplexer;                if(mux != null) {                    Address addr=entry.channel.getLocalAddress();                    if(addr != null) {                        try {                            mux.sendServiceDownMessage(ch.getId(), addr);                        }                        catch(Exception e) {                            if(log.isErrorEnabled())                                log.error("failed sending SERVICE_DOWN message", e);                        }                    }                    all_closed=mux.close(); // closes JChannel if all MuxChannels are in closed state                }            }            if(all_closed) {                channels.remove(stack_name);            }            if(expose_channels && server != null) {                try {                    unregister(domain + ":*,cluster=" + stack_name);                }                catch(Exception e) {                    log.error("failed unregistering channel " + stack_name, e);                }            }        }    }    public void shutdown(MuxChannel ch) {        Entry entry;        String stack_name=ch.getStackName();        boolean all_closed=false;        synchronized(channels) {            entry=(Entry)channels.get(stack_name);            if(entry != null) {                synchronized(entry) {                    Multiplexer mux=entry.multiplexer;                    if(mux != null) {                        Address addr=entry.channel.getLocalAddress();                        try {                            mux.sendServiceDownMessage(ch.getId(), addr);                        }                        catch(Exception e) {                            if(log.isErrorEnabled())                                log.error("failed sending SERVICE_DOWN message", e);                        }                        all_closed=mux.shutdown(); // closes JChannel if all MuxChannels are in closed state                        //mux.unregister(ch.getId());                    }                }                if(all_closed) {                    channels.remove(stack_name);                }                if(expose_channels && server != null) {                    try {                        unregister(domain + ":*,cluster=" + stack_name);                    }                    catch(Exception e) {                        log.error("failed unregistering channel " + stack_name, e);                    }                }            }        }    }    public void open(MuxChannel ch) throws ChannelException {        Entry entry;        synchronized(channels) {            entry=(Entry)channels.get(ch.getStackName());        }        if(entry != null) {            synchronized(entry) {                if(entry.channel == null)                    throw new ChannelException("channel has to be created before it can be opened");                if(!entry.channel.isOpen())                    entry.channel.open();            }        }        ch.setClosed(false);        ch.setConnected(false); //  needs to be connected next    }    public void create() throws Exception{        if(expose_channels) {            server=Util.getMBeanServer();            if(server == null)                throw new Exception("No MBeanServer found; JChannelFactory needs to be run with an MBeanServer present, " +                        "e.g. inside JBoss or JDK 5, or with ExposeChannel set to false");            if(domain == null)                domain="jgroups:name=Multiplexer";        }    }    public void start() throws Exception {    }    public void stop() {    }    public void destroy() {        synchronized(channels) {            Entry entry;            Map.Entry tmp;            for(Iterator it=channels.entrySet().iterator(); it.hasNext();) {                tmp=(Map.Entry)it.next();                entry=(Entry)tmp.getValue();                if(entry.multiplexer != null)                    entry.multiplexer.closeAll();                if(entry.channel != null)                    entry.channel.close();            }            if(expose_channels && server != null) {                try {                    unregister(domain + ":*");                }                catch(Throwable e) {                    log.error("failed unregistering domain " + domain, e);                }            }            channels.clear();        }    }    public String dumpConfiguration() {        if(stacks != null) {            return stacks.keySet().toString();        }        else            return null;    }    public String dumpChannels() {        if(channels == null)            return null;        StringBuffer sb=new StringBuffer();        for(Iterator it=channels.entrySet().iterator(); it.hasNext();) {            Map.Entry entry=(Map.Entry)it.next();            sb.append(entry.getKey()).append(": ").append(((Entry)entry.getValue()).multiplexer.getServiceIds()).append("\n");        }        return sb.toString();    }    private void parse(InputStream input) throws Exception {        /**         * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish...         * But it seems to work, and it is executed only on startup, so no perf loss on the critical path.         * If somebody wants to improve this, please be my guest.         */        DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance();        factory.setValidating(false); //for now        DocumentBuilder builder=factory.newDocumentBuilder();        Document document=builder.parse(input);        // The root element of the document should be the "config" element,        // but the parser(Element) method checks this so a check is not        // needed here.        Element configElement = document.getDocumentElement();        parse(configElement);    }    private void parse(Element root) throws Exception {        /**         * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish...         * But it seems to work, and it is executed only on startup, so no perf loss on the critical path.         * If somebody wants to improve this, please be my guest.         */        String root_name=root.getNodeName();        if(!PROTOCOL_STACKS.equals(root_name.trim().toLowerCase())) {            String error="XML protocol stack configuration does not start with a '<config>' element; " +                    "maybe the XML configuration needs to be converted to the new format ?\n" +                    "use 'java org.jgroups.conf.XmlConfigurator <old XML file> -new_format' to do so";            throw new IOException("invalid XML configuration: " + error);        }        NodeList tmp_stacks=root.getChildNodes();        for(int i=0; i < tmp_stacks.getLength(); i++) {            Node node = tmp_stacks.item(i);            if(node.getNodeType() != Node.ELEMENT_NODE )                continue;            Element stack=(Element) node;            String tmp=stack.getNodeName();            if(!STACK.equals(tmp.trim().toLowerCase())) {                throw new IOException("invalid configuration: didn't find a \"" + STACK + "\" element under \"" + PROTOCOL_STACKS + "\"");            }            NamedNodeMap attrs = stack.getAttributes();            Node name=attrs.getNamedItem(NAME);            // Node descr=attrs.getNamedItem(DESCR);            String st_name=name.getNodeValue();            // String stack_descr=descr.getNodeValue();            // System.out.print("Parsing \"" + st_name + "\" (" + stack_descr + ")");            NodeList configs=stack.getChildNodes();            for(int j=0; j < configs.getLength(); j++) {                Node tmp_config=configs.item(j);                if(tmp_config.getNodeType() != Node.ELEMENT_NODE )                    continue;                Element cfg = (Element) tmp_config;                tmp=cfg.getNodeName();                if(!CONFIG.equals(tmp))                    throw new IOException("invalid configuration: didn't find a \"" + CONFIG + "\" element under \"" + STACK + "\"");                XmlConfigurator conf=XmlConfigurator.getInstance(cfg);                // fixes http://jira.jboss.com/jira/browse/JGRP-290                ConfiguratorFactory.substituteVariables(conf); // replace vars with system props                String val=conf.getProtocolStackString();                this.stacks.put(st_name, val);            }            //System.out.println(" - OK");        }//        System.out.println("stacks: ");//        for(Iterator it=stacks.entrySet().iterator(); it.hasNext();) {//            Map.Entry entry=(Map.Entry)it.next();//            System.out.println("key: " + entry.getKey());//            System.out.println("val: " + entry.getValue() + "\n");//        }    }    /**     * Returns the stack configuration as a string (to be fed into new JChannel()). Throws an exception     * if the stack_name is not found. One of the setMultiplexerConfig() methods had to be called beforehand     * @return The protocol stack config as a plain string     */    private String getConfig(String stack_name) throws Exception {        String cfg=(String)stacks.get(stack_name);        if(cfg == null)            throw new Exception("stack \"" + stack_name + "\" not found in " + stacks.keySet());        return cfg;    }    private static class Entry {        JChannel channel;        Multiplexer multiplexer;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?