📄 joramadapter.java
字号:
} // Administering as specified in the properties file. try { File file = null; try { if (platformConfigDir == null) { java.net.URL url = ClassLoader.getSystemResource(adminFile); file = new File(url.getFile()); } else file = new File(platformConfigDir, adminFile); } catch (NullPointerException e) { throw new java.io.FileNotFoundException(); } FileReader fileReader = new FileReader(file); BufferedReader reader = new BufferedReader(fileReader); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, " - Reading the provided admin file: " + file); boolean end = false; String line; StringTokenizer tokenizer; String firstToken; String name = null; while (! end) { try { line = reader.readLine(); if (line == null) end = true; else { tokenizer = new StringTokenizer(line); if (tokenizer.hasMoreTokens()) { firstToken = tokenizer.nextToken(); if (firstToken.equalsIgnoreCase("Host")) { if (tokenizer.hasMoreTokens()) hostName = tokenizer.nextToken(); } else if (firstToken.equalsIgnoreCase("Port")) { if (tokenizer.hasMoreTokens()) serverPort = Integer.parseInt(tokenizer.nextToken()); } else if (firstToken.equalsIgnoreCase("Queue")) { if (tokenizer.hasMoreTokens()) { name = tokenizer.nextToken(); createQueue(name); } } else if (firstToken.equalsIgnoreCase("Topic")) { if (tokenizer.hasMoreTokens()) { name = tokenizer.nextToken(); createTopic(name); } } else if (firstToken.equalsIgnoreCase("User")) { if (tokenizer.hasMoreTokens()) name = tokenizer.nextToken(); if (tokenizer.hasMoreTokens()) { String password = tokenizer.nextToken(); createUser(name, password); } else if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, " - Missing password for user [" + name + "]"); } else if (firstToken.equalsIgnoreCase("CF")) { if (tokenizer.hasMoreTokens()) { name = tokenizer.nextToken(); createCF(name); } } else if (firstToken.equalsIgnoreCase("QCF")) { if (tokenizer.hasMoreTokens()) { name = tokenizer.nextToken(); createQCF(name); } } else if (firstToken.equalsIgnoreCase("TCF")) { if (tokenizer.hasMoreTokens()) { name = tokenizer.nextToken(); createTCF(name); } } } } } // Error while reading one line. catch (IOException exc) { // Error while creating the destination. } catch (AdminException exc) { AdapterTracing.dbgAdapter.log(BasicLevel.ERROR, "Creation failed",exc); } } } // No destination to deploy. catch (java.io.FileNotFoundException fnfe) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, " - No administration task requested."); } if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "Server port is " + serverPort); started = true; // Registering MBeans... try { jmxServer.registerMBean(this, "joramClient", "type=JoramAdapter,version=" + ConnectionMetaData.providerVersion); } catch (Exception e) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) AdapterTracing.dbgAdapter.log(BasicLevel.WARN, " - Could not register JoramAdapterMBean", e); } if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "JORAM adapter " + ConnectionMetaData.providerVersion + " successfully deployed."); } /** * Notifies the adapter to terminate the connections it manages, and if * needed, to shut down the collocated JORAM server. */ public synchronized void stop() { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "JORAM adapter stopping..."); if (! started || stopped) return; // Unbinds the bound objects... while (! boundNames.isEmpty()) unbind((String) boundNames.remove(0)); // Finishing the admin session. joramAdmin.getPlatformAdmin().disconnect(); // Closing the outbound connections, if any. while (! producers.isEmpty()) { try { ((ManagedConnectionImpl) producers.remove(0)).destroy(); } catch (Exception exc) {} } // Closing the inbound connections, if any. for (Enumeration keys = consumers.keys(); keys.hasMoreElements();) ((InboundConsumer) consumers.get(keys.nextElement())).close(); // Browsing the recovery connections, if any. if (connections != null) { for (Enumeration keys = connections.keys(); keys.hasMoreElements();) { try { ((XAConnection) connections.get(keys.nextElement())).close(); } catch (Exception exc) {} } } // If JORAM server is collocated, stopping it. if (collocated) { try { AgentServer.stop(); } catch (Exception exc) {} } stopped = true; try { jmxServer.unregisterMBean("joramClient", "type=JoramAdapter,version=" + ConnectionMetaData.providerVersion); } catch (Exception e) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) AdapterTracing.dbgAdapter.log(BasicLevel.WARN, "unregisterMBean", e); } if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "JORAM adapter successfully stopped."); } /** * Notifies the adapter to setup asynchronous message delivery for an * application server endoint. * * @exception IllegalStateException If the adapter is either not started, * or stopped. * @exception NotSupportedException If the provided activation parameters * are invalid. * @exception CommException If the JORAM server is not reachable. * @exception SecurityException If connecting is not allowed. * @exception ResourceException Generic exception. */ public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " endpointActivation(" + endpointFactory + ", " + spec + ")"); if (! started) throw new IllegalStateException("Non started resource adapter."); if (stopped) throw new IllegalStateException("Stopped resource adapter."); if (! (spec instanceof ActivationSpecImpl)) throw new ResourceException("Provided ActivationSpec instance is not " + "a JORAM activation spec."); ActivationSpecImpl specImpl = (ActivationSpecImpl) spec; if (! specImpl.getResourceAdapter().equals(this)) throw new ResourceException("Supplied ActivationSpec instance " + "associated to an other ResourceAdapter."); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, "Activating Endpoint on JORAM adapter."); boolean durable = specImpl.getSubscriptionDurability() != null && specImpl.getSubscriptionDurability().equalsIgnoreCase("Durable"); boolean transacted = false; try { Class listenerClass = Class.forName("javax.jms.MessageListener"); Class[] parameters = { Class.forName("javax.jms.Message") }; Method meth = listenerClass.getMethod("onMessage", parameters); transacted = endpointFactory.isDeliveryTransacted(meth); } catch (Exception exc) { throw new ResourceException("Could not determine transactional " + "context: " + exc); } int maxWorks = 10; try { maxWorks = Integer.parseInt(specImpl.getMaxNumberOfWorks()); } catch (Exception exc) { throw new ResourceException("Invalid max number of works instances " + "number: " + exc); } int maxMessages = 10; try { maxMessages = Integer.parseInt(specImpl.getMaxMessages()); } catch (Exception exc) { throw new ResourceException("Invalid max messages " + "number: " + exc); } int ackMode; try { if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl .getAcknowledgeMode())) { ackMode = Session.AUTO_ACKNOWLEDGE; } else if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl .getAcknowledgeMode())) { ackMode = Session.DUPS_OK_ACKNOWLEDGE; } else { ackMode = Session.AUTO_ACKNOWLEDGE; } } catch (Exception exc) { throw new ResourceException("Invalid acknowledge mode: " + exc); } String destType = specImpl.getDestinationType(); String destName = specImpl.getDestination(); try { Destination dest; if (destType.equals("javax.jms.Queue")) dest = createQueue(destName); else if (destType.equals("javax.jms.Topic")) dest = createTopic(destName); else throw new NotSupportedException("Invalid destination type provided " + "as activation parameter: " + destType); String userName = specImpl.getUserName(); String password = specImpl.getPassword(); createUser(userName, password); XAConnectionFactory connectionFactory = null; if (isHa) { if (collocated) connectionFactory = XAHALocalConnectionFactory.create(); else { String urlHa = "hajoram://" + hostName + ":" + serverPort; connectionFactory = XAHATcpConnectionFactory.create(urlHa); } } else { if (collocated) connectionFactory = XALocalConnectionFactory.create(); else connectionFactory = XATcpConnectionFactory.create(hostName, serverPort); } ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().connectingTimer = connectingTimer; ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().cnxPendingTimer = cnxPendingTimer; ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().txPendingTimer = txPendingTimer; if (queueMessageReadMax > 0) { ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) .getParameters().queueMessageReadMax = queueMessageReadMax; } if (topicAckBufferMax > 0) { ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) .getParameters().topicAckBufferMax = topicAckBufferMax; } if (topicPassivationThreshold > 0) { ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) .getParameters().topicPassivationThreshold = topicPassivationThreshold; } if (topicActivationThreshold > 0) { ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory) .getParameters().topicActivationThreshold = topicActivationThreshold; } XAConnection cnx = connectionFactory.createXAConnection(userName, password); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " endpointActivation cnx = " + cnx); // Creating and registering a consumer instance for this endpoint. InboundConsumer consumer = new InboundConsumer(workManager, endpointFactory, cnx, dest, specImpl.getMessageSelector(), durable, specImpl.getSubscriptionName(), transacted, maxWorks, maxMessages, ackMode); consumers.put(specImpl, consumer); } catch (javax.jms.JMSSecurityException exc) { throw new SecurityException("Invalid user identification: " + exc); } catch (javax.jms.JMSException exc) { throw new CommException("Could not connect to the JORAM server: "
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -