⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsconnector.java

📁 提供ESB 应用mule源代码 提供ESB 应用mule源代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        if (connection != null)        {            connection.setExceptionListener(this);        }        if (clientId != null)        {            connection.setClientID(getClientId());        }        return connection;    }    public void onException(JMSException jmsException)    {        final JmsConnector jmsConnector = JmsConnector.this;        Map receivers = jmsConnector.getReceivers();        boolean isMultiConsumerReceiver = false;                if (!receivers.isEmpty())         {            Map.Entry entry = (Map.Entry) receivers.entrySet().iterator().next();            if (entry.getValue() instanceof MultiConsumerJmsMessageReceiver)            {                isMultiConsumerReceiver = true;            }        }                int expectedReceiverCount = isMultiConsumerReceiver ? 1 :             (jmsConnector.getReceivers().size() * jmsConnector.getNumberOfConcurrentTransactedReceivers());                if (logger.isDebugEnabled())        {            logger.debug("About to recycle myself due to remote JMS connection shutdown but need "                + "to wait for all active receivers to report connection loss. Receiver count: "                 + (receiverReportedExceptionCount.get() + 1) + '/' + expectedReceiverCount);        }                if (receiverReportedExceptionCount.incrementAndGet() >= expectedReceiverCount)        {            receiverReportedExceptionCount.set(0);                    handleException(new ConnectException(jmsException, this));        }    }    protected void doConnect() throws ConnectException    {        try        {            if (jmsSupport == null)            {                if (JmsConstants.JMS_SPECIFICATION_102B.equals(specification))                {                    jmsSupport = new Jms102bSupport(this);                }                else                {                    jmsSupport = new Jms11Support(this);                }            }        }        catch (Exception e)        {            throw new ConnectException(CoreMessages.failedToCreate("Jms Connector"), e, this);        }        try        {            connection = createConnection();            if (started.get())            {                connection.start();            }        }        catch (Exception e)        {            throw new ConnectException(e, this);        }    }    protected void doDisconnect() throws ConnectException    {        try        {            if (connection != null)            {                connection.close();            }        }        catch (Exception e)        {            throw new ConnectException(e, this);        }        finally        {            // connectionFactory = null;            connection = null;        }    }    public MessageAdapter getMessageAdapter(Object message) throws MessagingException    {        JmsMessageAdapter adapter = (JmsMessageAdapter) super.getMessageAdapter(message);        adapter.setSpecification(this.getSpecification());        return adapter;    }    protected Object getReceiverKey(Service service, InboundEndpoint endpoint)    {        return service.getName() + "~" + endpoint.getEndpointURI().getAddress();    }    public Session getSessionFromTransaction()    {        Transaction tx = TransactionCoordination.getInstance().getTransaction();        if (tx != null)        {            if (tx.hasResource(connection))            {                if (logger.isDebugEnabled())                {                    logger.debug("Retrieving jms session from current transaction " + tx);                }                Session session = (Session) tx.getResource(connection);                if (logger.isDebugEnabled())                {                    logger.debug("Using " + session + " bound to transaction " + tx);                }                return session;            }        }        return null;    }    public Session getSession(ImmutableEndpoint endpoint) throws JMSException    {        final boolean topic = getTopicResolver().isTopic(endpoint);        return getSession(endpoint.getTransactionConfig().isTransacted(), topic);    }    public Session getSession(boolean transacted, boolean topic) throws JMSException    {        if (!isConnected())        {            throw new JMSException("Not connected");        }        Session session = getSessionFromTransaction();        if (session != null)        {            return session;        }        Transaction tx = TransactionCoordination.getInstance().getTransaction();        if (logger.isDebugEnabled())        {            logger.debug(MessageFormat.format(                    "Retrieving new jms session from connection: " +                            "topic={0}, transacted={1}, ack mode={2}, nolocal={3}",                    new Object[]{Boolean.valueOf(topic),                                 Boolean.valueOf(transacted),                                 new Integer(acknowledgementMode),                                 Boolean.valueOf(noLocal)}));        }        session = jmsSupport.createSession(connection, topic, transacted, acknowledgementMode, noLocal);        if (tx != null)        {            logger.debug("Binding session " + session + " to current transaction " + tx);            try            {                tx.bindResource(connection, session);            }            catch (TransactionException e)            {                closeQuietly(session);                throw new RuntimeException("Could not bind session to current transaction", e);            }        }        return session;    }    protected void doStart() throws MuleException    {        if (connection != null)        {            try            {                connection.start();            }            catch (JMSException e)            {                throw new StartException(CoreMessages.failedToStart("Jms Connection"), e, this);            }        }    }    protected void doStop() throws MuleException    {        // template method    }    public ReplyToHandler getReplyToHandler()    {        return new JmsReplyToHandler(this, getDefaultResponseTransformers());    }    public void onNotification(ServerNotification notification)    {        if (notification.getAction() == ConnectionNotification.CONNECTION_DISCONNECTED                || notification.getAction() == ConnectionNotification.CONNECTION_FAILED)        {            // Remove all dispatchers as any cached session will be invalidated            disposeDispatchers();            // TODO should we dispose receivers here as well (in case they are            // transactional)            // gives a harmless NPE at            // AbstractConnector.connect(AbstractConnector.java:927)            // disposeReceivers();        }    }    /**     * This method may be overridden in case a certain JMS implementation does not     * support all the standard JMS properties.     */    public boolean supportsProperty(String property)    {        return true;    }    /**     * This method may be overridden in order to apply pre-processing to the message     * as soon as it arrives.     *     * @param message - the incoming message     * @param session - the JMS session     * @return the preprocessed message     */    public javax.jms.Message preProcessMessage(javax.jms.Message message, Session session) throws Exception    {        return message;    }    /**     * Closes the MessageProducer     *     * @param producer     * @throws JMSException     */    public void close(MessageProducer producer) throws JMSException    {        if (producer != null)        {            producer.close();        }    }    /**     * Closes the MessageProducer without throwing an exception (an error message is     * logged instead).     *     * @param producer     */    public void closeQuietly(MessageProducer producer)    {        try        {            close(producer);        }        catch (JMSException e)        {            logger.error("Failed to close jms message producer", e);        }    }    /**     * Closes the MessageConsumer     *     * @param consumer     * @throws JMSException     */    public void close(MessageConsumer consumer) throws JMSException    {        if (consumer != null)        {            consumer.close();        }    }    /**     * Closes the MessageConsumer without throwing an exception (an error message is     * logged instead).     *     * @param consumer     */    public void closeQuietly(MessageConsumer consumer)    {        try        {            close(consumer);        }        catch (JMSException e)        {            logger.error("Failed to close jms message consumer", e);        }    }    /**     * Closes the MuleSession     *     * @param session     * @throws JMSException     */    public void close(Session session) throws JMSException    {        if (session != null)        {            session.close();        }    }    /**     * Closes the MuleSession without throwing an exception (an error message is logged     * instead).     *     * @param session     */    public void closeQuietly(Session session)    {        try        {            close(session);        }        catch (JMSException e)        {            logger.error("Failed to close jms session consumer", e);        }    }    /**     * Closes the TemporaryQueue     *     * @param tempQueue     * @throws JMSException     */    public void close(TemporaryQueue tempQueue) throws JMSException    {        if (tempQueue != null)        {            tempQueue.delete();        }    }    /**     * Closes the TemporaryQueue without throwing an exception (an error message is     * logged instead).     *     * @param tempQueue     */    public void closeQuietly(TemporaryQueue tempQueue)    {        try        {            close(tempQueue);        }        catch (JMSException e)        {            if (logger.isErrorEnabled())            {                String queueName = "";                try                {                    queueName = tempQueue.getQueueName();                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -