⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsconnector.java

📁 Java有关XML编程需要用到axis 的源代码 把里面bin下的包导入相应的Java工程 进行使用
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                int priority = extractPriority(properties);                long timeToLive = extractTimeToLive(properties);                if(properties != null && !properties.isEmpty())                    setProperties(properties, jmsMessage);                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,                     priority, timeToLive);            }            void cleanup()            {                try{m_producer.close();}catch(Throwable t){}                try{m_session.close();}catch(Throwable t){}            }            byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,                        HashMap properties)                throws Exception            {                Destination reply = createTemporaryDestination();                MessageConsumer subscriber = createConsumer(reply);                BytesMessage jmsMessage = m_session.createBytesMessage();                jmsMessage.writeBytes(message);                jmsMessage.setJMSReplyTo(reply);                int deliveryMode = extractDeliveryMode(properties);                int priority = extractPriority(properties);                long timeToLive = extractTimeToLive(properties);                if(properties != null && !properties.isEmpty())                    setProperties(properties, jmsMessage);                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,                     priority, timeToLive);                BytesMessage response = null;                try {                    response = (BytesMessage)subscriber.receive(timeout);                } catch (ClassCastException cce) {                    throw new InvokeException                            ("Error: unexpected message type received - expected BytesMessage");                }                byte[] respBytes = null;                if(response != null)                {                    byte[] buffer = new byte[8 * 1024];                    ByteArrayOutputStream out = new ByteArrayOutputStream();                    for(int bytesRead = response.readBytes(buffer);                        bytesRead != -1; bytesRead = response.readBytes(buffer))                    {                        out.write(buffer, 0, bytesRead);                    }                    respBytes = out.toByteArray();                }                subscriber.close();                deleteTemporaryDestination(reply);                return respBytes;            }            private int extractPriority(HashMap properties)            {                return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,                                         JMSConstants.DEFAULT_PRIORITY);            }            private int extractDeliveryMode(HashMap properties)            {                return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,                                         JMSConstants.DEFAULT_DELIVERY_MODE);            }            private long extractTimeToLive(HashMap properties)            {                return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,                                          JMSConstants.DEFAULT_TIME_TO_LIVE);            }            private void setProperties(HashMap properties, Message message)                throws JMSException            {                Iterator propertyIter = properties.entrySet().iterator();                while(propertyIter.hasNext())                {                    Map.Entry property = (Map.Entry)propertyIter.next();                    setProperty((String)property.getKey(), property.getValue(),                                message);                }            }            private void setProperty(String property, Object value, Message message)                throws JMSException            {                if(property == null)                    return;                if(property.equals(JMSConstants.JMS_CORRELATION_ID))                    message.setJMSCorrelationID((String)value);                else if(property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))                    message.setJMSCorrelationIDAsBytes((byte[])value);                else if(property.equals(JMSConstants.JMS_TYPE))                    message.setJMSType((String)value);                else                    message.setObjectProperty(property, value);            }        }    }    AsyncConnection getReceiveConnection()    {        return m_receiveConnection;    }    protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,                                                             javax.jms.Connection connection,                                                             String threadName,                                                             String clientID,                                                             String username,                                                             String password)        throws JMSException;    protected abstract class AsyncConnection extends Connection    {        HashMap m_subscriptions;        Object m_subscriptionLock;        protected AsyncConnection(ConnectionFactory connectionFactory,                                  javax.jms.Connection connection,                                  String threadName,                                  String clientID,                                  String username,                                  String password)            throws JMSException        {            super(connectionFactory, connection, threadName,                  clientID, username, password);            m_subscriptions = new HashMap();            m_subscriptionLock = new Object();        }        protected abstract ListenerSession createListenerSession(                                                javax.jms.Connection connection,                                                Subscription subscription)            throws Exception;        protected void onShutdown()        {            synchronized(m_subscriptionLock)            {                Iterator subscriptions = m_subscriptions.keySet().iterator();                while(subscriptions.hasNext())                {                    Subscription subscription = (Subscription)subscriptions.next();                    ListenerSession session  = (ListenerSession)                                                m_subscriptions.get(subscription);                    if(session != null)                    {                        session.cleanup();                    }                }                m_subscriptions.clear();            }        }        /**         * @todo add in security exception propagation         * @param subscription         */        void subscribe(Subscription subscription)            throws Exception        {            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;            synchronized(m_subscriptionLock)            {                if(m_subscriptions.containsKey(subscription))                    return;                while(true)                {                    if(System.currentTimeMillis() > timeoutTime)                    {                        throw new InvokeTimeoutException("Cannot subscribe listener");                    }                    try                    {                        ListenerSession session = createListenerSession(m_connection,                                                                        subscription);                        m_subscriptions.put(subscription, session);                        break;                    }                    catch(JMSException jmse)                    {                        if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION))                        {                            throw jmse;                        }                        try{m_subscriptionLock.wait(m_interactRetryInterval);}                        catch(InterruptedException ignore){}                        //give reconnect a chance                        Thread.yield();                        continue;                    }                    catch(NullPointerException jmse)                    {                        //we ARE reconnecting                        try{m_subscriptionLock.wait(m_interactRetryInterval);}                        catch(InterruptedException ignore){}                        //give reconnect a chance                        Thread.yield();                        continue;                    }                }            }        }        void unsubscribe(Subscription subscription)        {            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;            synchronized(m_subscriptionLock)            {                if(!m_subscriptions.containsKey(subscription))                    return;                while(true)                {                    if(System.currentTimeMillis() > timeoutTime)                    {                        throw new InvokeTimeoutException("Cannot unsubscribe listener");                    }                    //give reconnect a chance                    Thread.yield();                    try                    {                        ListenerSession session = (ListenerSession)                                                m_subscriptions.get(subscription);                        session.cleanup();                        m_subscriptions.remove(subscription);                        break;                    }                    catch(NullPointerException jmse)                    {                        //we are reconnecting                        try{m_subscriptionLock.wait(m_interactRetryInterval);}                        catch(InterruptedException ignore){}                        continue;                    }                }            }        }        protected void onConnect()            throws Exception        {            synchronized(m_subscriptionLock)            {                Iterator subscriptions = m_subscriptions.keySet().iterator();                while(subscriptions.hasNext())                {                    Subscription subscription = (Subscription)subscriptions.next();                    if(m_subscriptions.get(subscription) == null)                    {                        m_subscriptions.put(subscription,                            createListenerSession(m_connection, subscription));                    }                }                m_subscriptionLock.notifyAll();            }        }        protected void onException()        {            synchronized(m_subscriptionLock)            {                Iterator subscriptions = m_subscriptions.keySet().iterator();                while(subscriptions.hasNext())                {                    Subscription subscription = (Subscription)subscriptions.next();                    m_subscriptions.put(subscription, null);                }            }        }        protected class ListenerSession extends ConnectorSession        {            protected MessageConsumer m_consumer;            protected Subscription    m_subscription;            ListenerSession(Session session,                            MessageConsumer consumer,                            Subscription subscription)                throws Exception            {                super(session);                m_subscription = subscription;                m_consumer = consumer;                Destination destination = subscription.m_endpoint.getDestination(m_session);                m_consumer.setMessageListener(subscription.m_listener);            }            void cleanup()            {                try{m_consumer.close();}catch(Exception ignore){}                try{m_session.close();}catch(Exception ignore){}            }        }    }    private abstract class ConnectorSession    {        Session m_session;        ConnectorSession(Session session)          throws JMSException        {            m_session = session;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -