⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jms.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: JMS.java,v 1.14 2006/06/23 09:01:24 belaban Exp $ package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import javax.naming.Context;import javax.naming.InitialContext;import java.io.*;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Implementation of the transport protocol using the Java Message Service (JMS). * This implementation depends on the JMS server that will distribute messages * published to the specific topic to all topic subscribers. * <p> * Protocol parameters are: * <ul> * <li><code>topicName</code> - (required), full JNDI name of the topic to be  * used for message publishing; *  * <li><code>cf</code> - (optional), full JNDI name of the topic connection  * factory that will create topic connection, default value is  * <code>"ConnectionFactory"</code>; *  * <li><code>jndiCtx</code> - (optional), value of the  * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can * specify it as the JVM system property  * <code>-Djava.naming.factory.initial=factory.class.Name</code>; *  * <li><code>providerURL</code> - (optional), value of the  * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it  * as the JVM system property <code>-Djava.naming.provider.url=some_url</code> *  * <li><code>ttl</code> - (required), time to live in milliseconds. Default  * value is 0, that means that messages will never expire and will be  * accumulated by a JMS server. *  * </ul> *  * <p> * Note, when you are using the JMS protocol, try to avoid using protocols  * that open server socket connections, like FD_SOCK. I belive that FD is more * appropriate failure detector for JMS case. *  * @author Roman Rokytskyy (rrokytskyy@acm.org) */public class JMS extends Protocol implements javax.jms.MessageListener {    public static final        String DEFAULT_CONNECTION_FACTORY = "ConnectionFactory";    public static final        String INIT_CONNECTION_FACTORY = "cf";    public static final        String INIT_TOPIC_NAME = "topicName";    public static final        String INIT_JNDI_CONTEXT = "jndiCtx";    public static final        String INIT_PROVIDER_URL = "providerURL";            public static final    	String TIME_TO_LIVE = "ttl";    public static final        String GROUP_NAME_PROPERTY = "jgroups_group_name";    public static final        String SRC_PROPERTY = "src";    public static final        String DEST_PROPERTY = "dest";    private final Vector members = new Vector();    private javax.jms.TopicConnectionFactory connectionFactory;    private javax.jms.Topic topic;    private javax.jms.TopicConnection connection;    private javax.jms.TopicSession session;    private javax.jms.TopicPublisher publisher;    private javax.jms.TopicSubscriber subscriber;    private String cfName;    private String topicName;    private String initCtxFactory;    private String providerUrl;    private long timeToLive;    private Context ctx;    private String group_addr;    private Address local_addr;    private Address mcast_addr;    private final ByteArrayOutputStream out_stream = new ByteArrayOutputStream(65535);        private static final java.util.Random RND = new java.util.Random();    /**     * Empty constructor.      */    public JMS() {    }    /**     * Get the name of the protocol.     *      * @return always returns the <code>"JMS"</code> string.     */    public String getName() {        return "JMS";    }    /**     * Get the string representation of the protocol.     *      * @return string representation of the protocol (not very useful though).     */    public String toString() {        return "Protocol JMS(local address: " + local_addr + ')';    }    /**     * Set protocol properties. Properties are:     * <ul>     * <li><code>topicName</code> - (required), full JNDI name of the topic to be      * used for message publishing;     *      * <li><code>cf</code> - (optional), full JNDI name of the topic connection      * factory that will create topic connection, default value is      * <code>"ConnectionFactory"</code>;     *      * <li><code>jndiCtx</code> - (optional), value of the      * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can     * specify it as the JVM system property      * <code>-Djava.naming.factory.initial=factory.class.Name</code>;     *      * <li><code>providerURL</code> - (optional), value of the      * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it      * as the JVM system property <code>-Djava.naming.provider.url=some_url</code>     * </ul>     *      */    public boolean setProperties(Properties props) {        super.setProperties(props);        cfName = props.getProperty(INIT_CONNECTION_FACTORY,                DEFAULT_CONNECTION_FACTORY);        props.remove(INIT_CONNECTION_FACTORY);        topicName = props.getProperty(INIT_TOPIC_NAME);        if (topicName == null)                throw new IllegalArgumentException(                "JMS topic has not been specified.");        props.remove(INIT_TOPIC_NAME);        initCtxFactory = props.getProperty(INIT_JNDI_CONTEXT);        props.remove(INIT_JNDI_CONTEXT);        providerUrl = props.getProperty(INIT_PROVIDER_URL);        props.remove(INIT_PROVIDER_URL);                String ttl = props.getProperty(TIME_TO_LIVE);                if (ttl == null) {            if(log.isErrorEnabled()) log.error("ttl property not found.");            return false;        }                props.remove(TIME_TO_LIVE);                // try to parse ttl property        try {                        timeToLive = Long.parseLong(ttl);                    } catch(NumberFormatException nfex) {            if(log.isErrorEnabled()) log.error("ttl property does not contain numeric value.");                        return false;        }        return props.size() == 0;    }    /**     * Implementation of the <code>javax.jms.MessageListener</code> interface.     * This method receives the JMS message, checks the destination group name.     * If the group name is the same as the group name of this channel, it      * checks the destination address. If destination address is either      * multicast or is the same as local address then message is unwrapped and     * passed up the protocol stack. Otherwise it is ignored.     *      * @param jmsMessage instance of <code>javax.jms.Message</code>.     */    public void onMessage(javax.jms.Message jmsMessage) {        try {            String groupName = jmsMessage.getStringProperty(GROUP_NAME_PROPERTY);            // there might be other messages in this topic            if (groupName == null)                return;                            if(log.isDebugEnabled()) log.debug("Got message for group [" +                groupName + ']' + ", my group is [" + group_addr + ']');            // not our message, ignore it            if (!group_addr.equals(groupName))                return;            JMSAddress src =                jmsMessage.getStringProperty(SRC_PROPERTY) != null ?                    new JMSAddress(jmsMessage.getStringProperty(SRC_PROPERTY)) :                    null;            JMSAddress dest =                jmsMessage.getStringProperty(DEST_PROPERTY) != null ?                    new JMSAddress(jmsMessage.getStringProperty(DEST_PROPERTY)) :                    null;            // if message is unicast message and I'm not the destination - ignore            if (src != null && dest != null &&                 !dest.equals(local_addr) && !dest.isMulticastAddress())                return;                        if (jmsMessage instanceof javax.jms.ObjectMessage) {                byte[] buf = (byte[])((javax.jms.ObjectMessage)jmsMessage).getObject();                ByteArrayInputStream inp_stream=new ByteArrayInputStream(buf);                ObjectInputStream inp=new ObjectInputStream(inp_stream);                                    Message msg=new Message();                msg.readExternal(inp);                                    Event evt=new Event(Event.MSG, msg);                 // +++ remove                    if(log.isDebugEnabled()) log.debug("Message is " + msg +                        ", headers are " + msg.getHeaders ());                /* Because Protocol.Up() is never called by this bottommost layer,                 * we call Up() directly in the observer. This allows e.g.                 * PerfObserver to get the time of reception of a message */                if(observer != null)                    observer.up(evt, up_queue.size());                passUp(evt);            }        } catch(javax.jms.JMSException ex) {            ex.printStackTrace();            if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString());        } catch(IOException ioex) {            ioex.printStackTrace();            if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString());        } catch(ClassNotFoundException cnfex) {                cnfex.printStackTrace();                if(log.isErrorEnabled()) log.error("ClassNotFoundException : " + cnfex.toString());        }    }    /**     * Handle down event, if it is not a Event.MSG type.     *      * @param evt event to handle.     */    protected void handleDownEvent(Event evt) {        switch(evt.getType()) {            // we do not need this at present time, maybe in the future            case Event.TMP_VIEW:            case Event.VIEW_CHANGE:                synchronized(members) {                        members.removeAllElements();                        Vector tmpvec=((View)evt.getArg()).getMembers();                        for(int i=0; i < tmpvec.size(); i++)                        members.addElement(tmpvec.elementAt(i));                }                break;            case Event.GET_LOCAL_ADDRESS:                // return local address -> Event(SET_LOCAL_ADDRESS, local)                passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));                break;            case Event.CONNECT:                group_addr=(String)evt.getArg();                passUp(new Event(Event.CONNECT_OK));                break;            case Event.DISCONNECT:                passUp(new Event(Event.DISCONNECT_OK));                break;        }    }    /**     * Called by the protocol above this. We check the event type, and if it is     * message, we publish it in the topic, otherwise we let the      * {@link #handleDownEvent(Event)} take care of it.     *      * @param evt event to process.     */    public void down(Event evt) {            if(log.isInfoEnabled()) log.info("event is " + evt + ", group_addr=" +                group_addr + ", time=" + System.currentTimeMillis() +                 ", hdrs are " + Util.printEvent(evt));        // handle all non-message events        if(evt.getType() != Event.MSG) {                 handleDownEvent(evt);                return;        }        // extract message        Message msg=(Message)evt.getArg();        // Because we don't call Protocol.passDown(), we notify the observer        // directly (e.g. PerfObserver).        // This way, we still have performance numbers for UDP        if(observer != null)                observer.passDown(evt);        // publish the message to the topic        sendMessage(msg);    }    /**     * Publish message in the JMS topic. We set the message source and      * destination addresses if they were <code>null</code>.     *      * @param msg message to publish.     */    protected void sendMessage(Message msg) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -