jchannelfactory.java
来自「JGRoups源码」· Java 代码 · 共 640 行 · 第 1/2 页
JAVA
640 行
if(entry.multiplexer != null) entry.multiplexer.addServiceIfNotPresent(ch.getId(), ch); if(!entry.channel.isConnected()) { entry.channel.connect(ch.getStackName()); if(entry.multiplexer != null) { try { entry.multiplexer.fetchServiceInformation(); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed fetching service state", e); } } } if(entry.multiplexer != null) { try { Address addr=entry.channel.getLocalAddress(); entry.multiplexer.sendServiceUpMessage(ch.getId(), addr); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed sending SERVICE_UP message", e); } } } } ch.setClosed(false); ch.setConnected(true); } public void disconnect(MuxChannel ch) { Entry entry; synchronized(channels) { entry=(Entry)channels.get(ch.getStackName()); } if(entry != null) { synchronized(entry) { Multiplexer mux=entry.multiplexer; if(mux != null) { Address addr=entry.channel.getLocalAddress(); try { mux.sendServiceDownMessage(ch.getId(), addr); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed sending SERVICE_DOWN message", e); } mux.disconnect(); // disconnects JChannel if all MuxChannels are in disconnected state } } } } public void close(MuxChannel ch) { Entry entry; String stack_name=ch.getStackName(); boolean all_closed=false; synchronized(channels) { entry=(Entry)channels.get(stack_name); } if(entry != null) { synchronized(entry) { Multiplexer mux=entry.multiplexer; if(mux != null) { Address addr=entry.channel.getLocalAddress(); if(addr != null) { try { mux.sendServiceDownMessage(ch.getId(), addr); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed sending SERVICE_DOWN message", e); } } all_closed=mux.close(); // closes JChannel if all MuxChannels are in closed state } } if(all_closed) { channels.remove(stack_name); } if(expose_channels && server != null) { try { unregister(domain + ":*,cluster=" + stack_name); } catch(Exception e) { log.error("failed unregistering channel " + stack_name, e); } } } } public void shutdown(MuxChannel ch) { Entry entry; String stack_name=ch.getStackName(); boolean all_closed=false; synchronized(channels) { entry=(Entry)channels.get(stack_name); if(entry != null) { synchronized(entry) { Multiplexer mux=entry.multiplexer; if(mux != null) { Address addr=entry.channel.getLocalAddress(); try { mux.sendServiceDownMessage(ch.getId(), addr); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed sending SERVICE_DOWN message", e); } all_closed=mux.shutdown(); // closes JChannel if all MuxChannels are in closed state //mux.unregister(ch.getId()); } } if(all_closed) { channels.remove(stack_name); } if(expose_channels && server != null) { try { unregister(domain + ":*,cluster=" + stack_name); } catch(Exception e) { log.error("failed unregistering channel " + stack_name, e); } } } } } public void open(MuxChannel ch) throws ChannelException { Entry entry; synchronized(channels) { entry=(Entry)channels.get(ch.getStackName()); } if(entry != null) { synchronized(entry) { if(entry.channel == null) throw new ChannelException("channel has to be created before it can be opened"); if(!entry.channel.isOpen()) entry.channel.open(); } } ch.setClosed(false); ch.setConnected(false); // needs to be connected next } public void create() throws Exception{ if(expose_channels) { server=Util.getMBeanServer(); if(server == null) throw new Exception("No MBeanServer found; JChannelFactory needs to be run with an MBeanServer present, " + "e.g. inside JBoss or JDK 5, or with ExposeChannel set to false"); if(domain == null) domain="jgroups:name=Multiplexer"; } } public void start() throws Exception { } public void stop() { } public void destroy() { synchronized(channels) { Entry entry; Map.Entry tmp; for(Iterator it=channels.entrySet().iterator(); it.hasNext();) { tmp=(Map.Entry)it.next(); entry=(Entry)tmp.getValue(); if(entry.multiplexer != null) entry.multiplexer.closeAll(); if(entry.channel != null) entry.channel.close(); } if(expose_channels && server != null) { try { unregister(domain + ":*"); } catch(Throwable e) { log.error("failed unregistering domain " + domain, e); } } channels.clear(); } } public String dumpConfiguration() { if(stacks != null) { return stacks.keySet().toString(); } else return null; } public String dumpChannels() { if(channels == null) return null; StringBuffer sb=new StringBuffer(); for(Iterator it=channels.entrySet().iterator(); it.hasNext();) { Map.Entry entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(": ").append(((Entry)entry.getValue()).multiplexer.getServiceIds()).append("\n"); } return sb.toString(); } private void parse(InputStream input) throws Exception { /** * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish... * But it seems to work, and it is executed only on startup, so no perf loss on the critical path. * If somebody wants to improve this, please be my guest. */ DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance(); factory.setValidating(false); //for now DocumentBuilder builder=factory.newDocumentBuilder(); Document document=builder.parse(input); // The root element of the document should be the "config" element, // but the parser(Element) method checks this so a check is not // needed here. Element configElement = document.getDocumentElement(); parse(configElement); } private void parse(Element root) throws Exception { /** * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish... * But it seems to work, and it is executed only on startup, so no perf loss on the critical path. * If somebody wants to improve this, please be my guest. */ String root_name=root.getNodeName(); if(!PROTOCOL_STACKS.equals(root_name.trim().toLowerCase())) { String error="XML protocol stack configuration does not start with a '<config>' element; " + "maybe the XML configuration needs to be converted to the new format ?\n" + "use 'java org.jgroups.conf.XmlConfigurator <old XML file> -new_format' to do so"; throw new IOException("invalid XML configuration: " + error); } NodeList tmp_stacks=root.getChildNodes(); for(int i=0; i < tmp_stacks.getLength(); i++) { Node node = tmp_stacks.item(i); if(node.getNodeType() != Node.ELEMENT_NODE ) continue; Element stack=(Element) node; String tmp=stack.getNodeName(); if(!STACK.equals(tmp.trim().toLowerCase())) { throw new IOException("invalid configuration: didn't find a \"" + STACK + "\" element under \"" + PROTOCOL_STACKS + "\""); } NamedNodeMap attrs = stack.getAttributes(); Node name=attrs.getNamedItem(NAME); // Node descr=attrs.getNamedItem(DESCR); String st_name=name.getNodeValue(); // String stack_descr=descr.getNodeValue(); // System.out.print("Parsing \"" + st_name + "\" (" + stack_descr + ")"); NodeList configs=stack.getChildNodes(); for(int j=0; j < configs.getLength(); j++) { Node tmp_config=configs.item(j); if(tmp_config.getNodeType() != Node.ELEMENT_NODE ) continue; Element cfg = (Element) tmp_config; tmp=cfg.getNodeName(); if(!CONFIG.equals(tmp)) throw new IOException("invalid configuration: didn't find a \"" + CONFIG + "\" element under \"" + STACK + "\""); XmlConfigurator conf=XmlConfigurator.getInstance(cfg); // fixes http://jira.jboss.com/jira/browse/JGRP-290 ConfiguratorFactory.substituteVariables(conf); // replace vars with system props String val=conf.getProtocolStackString(); this.stacks.put(st_name, val); } //System.out.println(" - OK"); }// System.out.println("stacks: ");// for(Iterator it=stacks.entrySet().iterator(); it.hasNext();) {// Map.Entry entry=(Map.Entry)it.next();// System.out.println("key: " + entry.getKey());// System.out.println("val: " + entry.getValue() + "\n");// } } /** * Returns the stack configuration as a string (to be fed into new JChannel()). Throws an exception * if the stack_name is not found. One of the setMultiplexerConfig() methods had to be called beforehand * @return The protocol stack config as a plain string */ private String getConfig(String stack_name) throws Exception { String cfg=(String)stacks.get(stack_name); if(cfg == null) throw new Exception("stack \"" + stack_name + "\" not found in " + stacks.keySet()); return cfg; } private static class Entry { JChannel channel; Multiplexer multiplexer; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?