📄 bridgeunifiedmodule.java
字号:
{ if (userName != null && password != null) cnx = cnxFact.createConnection(userName, password); else cnx = cnxFact.createConnection(); cnx.setExceptionListener(this); if (clientID != null) cnx.setClientID(clientID); producerSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = producerSession.createProducer(dest); consumerSession = cnx.createSession(true, 0); } /** * Sets the JMS consumer on the foreign destination. * * @exception JMSException If the JMS consumer could not be created. */ protected void setConsumer() throws JMSException { if (consumer != null) return; try { if (dest instanceof Queue) consumer = consumerSession.createConsumer(dest, selector); else consumer = consumerSession.createDurableSubscriber((Topic) dest, agentId.toString(), selector, false); } catch (JMSException exc) { throw exc; } catch (Exception exc) { throw new JMSException("JMS resources do not allow to create consumer: " + exc); } } /** * Unsets the JMS consumer. */ protected void unsetConsumer() { try { if (dest instanceof Topic) consumerSession.unsubscribe(agentId.toString()); consumer.close(); } catch (Exception exc) {} consumer = null; } /** * Acknowledges a message successfuly delivered to the foreign JMS server. */ protected void acknowledge(Message message) { Channel.sendTo(agentId, new BridgeAckNot(message.getIdentifier())); } /** * The <code>StartupDaemon</code> thread is responsible for retrieving * the needed JMS administered objects from the JNDI server. */ protected class StartupDaemon extends Daemon { /** Constructs a <code>StartupDaemon</code> thread. */ protected StartupDaemon() { super(agentId.toString() + ":StartupDaemon"); setDaemon(false); } /** The daemon's loop. */ public void run() { javax.naming.Context jndiCtx = null; try { canStop = true; // Administered objects still to be retrieved: getting them from // JNDI. if (cnxFact == null || dest == null) { if (jndiFactory == null || jndiUrl == null) jndiCtx = new javax.naming.InitialContext(); else { java.util.Hashtable env = new java.util.Hashtable(); env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, jndiFactory); env.put(javax.naming.Context.PROVIDER_URL, jndiUrl); jndiCtx = new javax.naming.InitialContext(env); } cnxFact = (ConnectionFactory) jndiCtx.lookup(cnxFactName); dest = (Destination) jndiCtx.lookup(destName); } try { doConnect(); } catch (AbstractMethodError exc) { usable = false; notUsableMessage = "Retrieved administered objects types not " + "compatible with the 'unified' communication " + " mode: " + exc; } catch (ClassCastException exc) { usable = false; notUsableMessage = "Retrieved administered objects types not " + "compatible with the chosen communication mode: " + exc; } catch (JMSSecurityException exc) { usable = false; notUsableMessage = "Provided user identification does not allow " + "to connect to the foreign JMS server: " + exc; } catch (JMSException exc) { reconnectionDaemon.reconnect(); } catch (Throwable exc) { usable = false; notUsableMessage = "" + exc; } } catch (javax.naming.NameNotFoundException exc) { usable = false; if (cnxFact == null) notUsableMessage = "Could not retrieve ConnectionFactory [" + cnxFactName + "] from JNDI: " + exc; else if (dest == null) notUsableMessage = "Could not retrieve Destination [" + destName + "] from JNDI: " + exc; } catch (javax.naming.NamingException exc) { usable = false; notUsableMessage = "Could not access JNDI: " + exc; } catch (ClassCastException exc) { usable = false; notUsableMessage = "Error while retrieving administered objects " + "through JNDI possibly because of missing " + "foreign JMS client libraries in classpath: " + exc; } catch (Exception exc) { usable = false; notUsableMessage = "Error while retrieving administered objects " + "through JNDI: " + exc; } finally { // Closing the JNDI context. try { jndiCtx.close(); } catch (Exception exc) {} finish(); } } /** Shuts the daemon down. */ public void shutdown() {} /** Releases the daemon's resources. */ public void close() {} } /** * The <code>ReconnectionDaemon</code> thread is responsible for reconnecting * the bridge module with the foreign JMS server in case of disconnection. */ protected class ReconnectionDaemon extends Daemon { /** Number of reconnection trials of the first step. */ private int attempts1 = 30; /** Retry interval (in milliseconds) of the first step. */ private long interval1 = 1000L; /** Number of reconnection trials of the second step. */ private int attempts2 = 55; /** Retry interval (in milliseconds) of the second step. */ private long interval2 = 5000L; /** Retry interval (in milliseconds) of the third step. */ private long interval3 = 60000L; /** Constructs a <code>ReconnectionDaemon</code> thread. */ protected ReconnectionDaemon() { super(agentId.toString() + ":ReconnectionDaemon"); setDaemon(false); } /** Notifies the daemon to start reconnecting. */ protected void reconnect() { if (running) return; consumer = null; start(); } /** The daemon's loop. */ public void run() { int attempts = 0; long interval; Message msg; try { while (running) { canStop = true; attempts++; if (attempts <= 30) interval = interval1; else if (attempts <= 55) interval = interval2; else interval = interval3; try { Thread.sleep(interval); doConnect(); // Setting the listener, if any. if (listener) setMessageListener(); // Starting the consumer daemon: consumerDaemon.start(); // Sending the pending messages, if any: while (! qout.isEmpty()) send((Message) qout.remove(0)); } catch (Exception exc) { continue; } canStop = false; break; } } finally { finish(); } } /** Shuts the daemon down. */ public void shutdown() {} /** Releases the daemon's resources. */ public void close() {} } /** * The <code>ConsumerDaemon</code> thread allows to call * <code>MessageConsumer.receive()</code> for requesting a foreign JMS * message without blocking the JORAM server. */ protected class ConsumerDaemon extends Daemon { /** Counter of pending "receive" requests. */ private int requests = 0; /** Constructs a <code>ReceiverDaemon</code> thread. */ protected ConsumerDaemon() { super(agentId.toString() + ":ConsumerDaemon"); setDaemon(false); } /** Notifies the daemon of a new "receive" request. */ protected synchronized void receive() { requests++; if (running) return; start(); } /** The daemon's loop. */ public void run() { try { Message momMessage; BridgeDeliveryNot notif; setConsumer(); cnx.start(); while (requests > 0 && running) { canStop = true; // Expecting a message: try { momMessage = MessageConverterModule.convert(consumer.receive()); consumerSession.commit(); } // Conversion error: denying the message. catch (MessageFormatException messageExc) { consumerSession.rollback(); continue; } // Processing the delivery. canStop = false; notif = new BridgeDeliveryNot(momMessage); Channel.sendTo(agentId, notif); requests--; } } // Connection loss? catch (JMSException exc) {} finally { finish(); } } /** Shuts the daemon down. */ public void shutdown() {} /** Releases the daemon's resources. */ public void close() {} } /** Deserializes a <code>BridgeUnifiedModule</code> instance. */ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { in.defaultReadObject(); qout = new Vector(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -