⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsbroadcastinglistener.java

📁 一个不错的cache
💻 JAVA
字号:
/* * Copyright (c) 2002-2003 by OpenSymphony * All rights reserved. */package com.opensymphony.oscache.plugins.clustersupport;import com.opensymphony.oscache.base.Cache;import com.opensymphony.oscache.base.Config;import com.opensymphony.oscache.base.FinalizationException;import com.opensymphony.oscache.base.InitializationException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.jms.*;import javax.naming.InitialContext;/** * A JMS based clustering implementation. This implementation is independent of the * JMS provider and uses non-persistent messages on a publish subscribe protocol. * * @author <a href="mailto:motoras@linuxmail.org">Romulus Pasca</a> */public class JMSBroadcastingListener extends AbstractBroadcastingListener {    private final static Log log = LogFactory.getLog(JMSBroadcastingListener.class);    /**     *The JMS connection used     */    private Connection connection;    /**     * Th object used to publish new messages     */    private MessageProducer messagePublisher;    /**     * The current JMS session     */    private Session publisherSession;    /**     * The name of this cluster. Used to identify the sender of a message.     */    private String clusterNode;    /**     * <p>Called by the cache administrator class when a cache is instantiated.</p>     * <p>The JMS broadcasting implementation requires the following configuration     * properties to be specified in <code>oscache.properties</code>:     * <ul>     * <li><b>cache.cluster.jms.topic.factory</b> - The JMS connection factory to use</li>     * <li><b>cache.cluster.jms.topic.name</b> - The JMS topic name</li>     * <li><b>cache.cluster.jms.node.name</b> - The name of this node in the cluster. This     * should be unique for each node.</li>     * Please refer to the clustering documentation for further details on configuring     * the JMS clustered caching.</p>     *     * @param cache the cache instance that this listener is attached to.     *     * @throws com.opensymphony.oscache.base.InitializationException thrown when there was a     * problem initializing the listener. The cache administrator will log this error and     * disable the listener.     */    public void initialize(Cache cache, Config config) throws InitializationException {        super.initialize(cache, config);        // Get the name of this node        clusterNode = config.getProperty("cache.cluster.jms.node.name");        String topic = config.getProperty("cache.cluster.jms.topic.name");        String topicFactory = config.getProperty("cache.cluster.jms.topic.factory");        if (log.isInfoEnabled()) {            log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")");        }        try {            // Make sure you have specified the necessary JNDI properties (usually in            // a jndi.properties resource file, or as system properties)            InitialContext jndi = new InitialContext();            // Look up a JMS connection factory            ConnectionFactory connectionFactory = (ConnectionFactory) jndi.lookup(topicFactory);            // Create a JMS connection            connection = connectionFactory.createConnection();            // Create session objects            publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            Session subSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            // Look up the JMS topic            Topic chatTopic = (Topic) jndi.lookup(topic);            // Create the publisher and subscriber            messagePublisher = publisherSession.createProducer(chatTopic);            MessageConsumer messageConsumer = subSession.createConsumer(chatTopic);            // Set the message listener            messageConsumer.setMessageListener(new MessageListener() {                    public void onMessage(Message message) {                        try {                            //check the message type                            ObjectMessage objectMessage = null;                            if (!(message instanceof ObjectMessage)) {                                log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored.");                                return;                            }                            objectMessage = (ObjectMessage) message;                            //check the message content                            if (!(objectMessage.getObject() instanceof ClusterNotification)) {                                log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored.");                                return;                            }                            if (log.isDebugEnabled()) {                                log.debug(objectMessage.getObject());                            }                            // This prevents the notification sent by this node from being handled by itself                            if (!objectMessage.getStringProperty("nodeName").equals(clusterNode)) {                                //now handle the message                                ClusterNotification notification = (ClusterNotification) objectMessage.getObject();                                handleClusterNotification(notification);                            }                        } catch (JMSException jmsEx) {                            log.error("Cannot handle cluster Notification", jmsEx);                        }                    }                });            // Start the JMS connection; allows messages to be delivered            connection.start();        } catch (Exception e) {            throw new InitializationException("Initialization of the JMSBroadcastingListener failed: " + e);        }    }    /**     * Called by the cache administrator class when a cache is destroyed.     *     * @throws com.opensymphony.oscache.base.FinalizationException thrown when there was a problem finalizing the     * listener. The cache administrator will catch and log this error.     */    public void finialize() throws FinalizationException {        try {            if (log.isInfoEnabled()) {                log.info("Shutting down JMS clustering...");            }            connection.close();            if (log.isInfoEnabled()) {                log.info("JMS clustering shutdown complete.");            }        } catch (JMSException e) {            log.warn("A problem was encountered when closing the JMS connection", e);        }    }    protected void sendNotification(ClusterNotification message) {        try {            ObjectMessage objectMessage = publisherSession.createObjectMessage();            objectMessage.setObject(message);            //sign the message, with the name of this node            objectMessage.setStringProperty("nodeName", clusterNode);            messagePublisher.send(objectMessage);        } catch (JMSException e) {            log.error("Cannot send notification " + message, e);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -