📄 jms.java
字号:
// $Id: JMS.java,v 1.14 2006/06/23 09:01:24 belaban Exp $ package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import javax.naming.Context;import javax.naming.InitialContext;import java.io.*;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Implementation of the transport protocol using the Java Message Service (JMS). * This implementation depends on the JMS server that will distribute messages * published to the specific topic to all topic subscribers. * <p> * Protocol parameters are: * <ul> * <li><code>topicName</code> - (required), full JNDI name of the topic to be * used for message publishing; * * <li><code>cf</code> - (optional), full JNDI name of the topic connection * factory that will create topic connection, default value is * <code>"ConnectionFactory"</code>; * * <li><code>jndiCtx</code> - (optional), value of the * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can * specify it as the JVM system property * <code>-Djava.naming.factory.initial=factory.class.Name</code>; * * <li><code>providerURL</code> - (optional), value of the * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it * as the JVM system property <code>-Djava.naming.provider.url=some_url</code> * * <li><code>ttl</code> - (required), time to live in milliseconds. Default * value is 0, that means that messages will never expire and will be * accumulated by a JMS server. * * </ul> * * <p> * Note, when you are using the JMS protocol, try to avoid using protocols * that open server socket connections, like FD_SOCK. I belive that FD is more * appropriate failure detector for JMS case. * * @author Roman Rokytskyy (rrokytskyy@acm.org) */public class JMS extends Protocol implements javax.jms.MessageListener { public static final String DEFAULT_CONNECTION_FACTORY = "ConnectionFactory"; public static final String INIT_CONNECTION_FACTORY = "cf"; public static final String INIT_TOPIC_NAME = "topicName"; public static final String INIT_JNDI_CONTEXT = "jndiCtx"; public static final String INIT_PROVIDER_URL = "providerURL"; public static final String TIME_TO_LIVE = "ttl"; public static final String GROUP_NAME_PROPERTY = "jgroups_group_name"; public static final String SRC_PROPERTY = "src"; public static final String DEST_PROPERTY = "dest"; private final Vector members = new Vector(); private javax.jms.TopicConnectionFactory connectionFactory; private javax.jms.Topic topic; private javax.jms.TopicConnection connection; private javax.jms.TopicSession session; private javax.jms.TopicPublisher publisher; private javax.jms.TopicSubscriber subscriber; private String cfName; private String topicName; private String initCtxFactory; private String providerUrl; private long timeToLive; private Context ctx; private String group_addr; private Address local_addr; private Address mcast_addr; private final ByteArrayOutputStream out_stream = new ByteArrayOutputStream(65535); private static final java.util.Random RND = new java.util.Random(); /** * Empty constructor. */ public JMS() { } /** * Get the name of the protocol. * * @return always returns the <code>"JMS"</code> string. */ public String getName() { return "JMS"; } /** * Get the string representation of the protocol. * * @return string representation of the protocol (not very useful though). */ public String toString() { return "Protocol JMS(local address: " + local_addr + ')'; } /** * Set protocol properties. Properties are: * <ul> * <li><code>topicName</code> - (required), full JNDI name of the topic to be * used for message publishing; * * <li><code>cf</code> - (optional), full JNDI name of the topic connection * factory that will create topic connection, default value is * <code>"ConnectionFactory"</code>; * * <li><code>jndiCtx</code> - (optional), value of the * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can * specify it as the JVM system property * <code>-Djava.naming.factory.initial=factory.class.Name</code>; * * <li><code>providerURL</code> - (optional), value of the * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it * as the JVM system property <code>-Djava.naming.provider.url=some_url</code> * </ul> * */ public boolean setProperties(Properties props) { super.setProperties(props); cfName = props.getProperty(INIT_CONNECTION_FACTORY, DEFAULT_CONNECTION_FACTORY); props.remove(INIT_CONNECTION_FACTORY); topicName = props.getProperty(INIT_TOPIC_NAME); if (topicName == null) throw new IllegalArgumentException( "JMS topic has not been specified."); props.remove(INIT_TOPIC_NAME); initCtxFactory = props.getProperty(INIT_JNDI_CONTEXT); props.remove(INIT_JNDI_CONTEXT); providerUrl = props.getProperty(INIT_PROVIDER_URL); props.remove(INIT_PROVIDER_URL); String ttl = props.getProperty(TIME_TO_LIVE); if (ttl == null) { if(log.isErrorEnabled()) log.error("ttl property not found."); return false; } props.remove(TIME_TO_LIVE); // try to parse ttl property try { timeToLive = Long.parseLong(ttl); } catch(NumberFormatException nfex) { if(log.isErrorEnabled()) log.error("ttl property does not contain numeric value."); return false; } return props.size() == 0; } /** * Implementation of the <code>javax.jms.MessageListener</code> interface. * This method receives the JMS message, checks the destination group name. * If the group name is the same as the group name of this channel, it * checks the destination address. If destination address is either * multicast or is the same as local address then message is unwrapped and * passed up the protocol stack. Otherwise it is ignored. * * @param jmsMessage instance of <code>javax.jms.Message</code>. */ public void onMessage(javax.jms.Message jmsMessage) { try { String groupName = jmsMessage.getStringProperty(GROUP_NAME_PROPERTY); // there might be other messages in this topic if (groupName == null) return; if(log.isDebugEnabled()) log.debug("Got message for group [" + groupName + ']' + ", my group is [" + group_addr + ']'); // not our message, ignore it if (!group_addr.equals(groupName)) return; JMSAddress src = jmsMessage.getStringProperty(SRC_PROPERTY) != null ? new JMSAddress(jmsMessage.getStringProperty(SRC_PROPERTY)) : null; JMSAddress dest = jmsMessage.getStringProperty(DEST_PROPERTY) != null ? new JMSAddress(jmsMessage.getStringProperty(DEST_PROPERTY)) : null; // if message is unicast message and I'm not the destination - ignore if (src != null && dest != null && !dest.equals(local_addr) && !dest.isMulticastAddress()) return; if (jmsMessage instanceof javax.jms.ObjectMessage) { byte[] buf = (byte[])((javax.jms.ObjectMessage)jmsMessage).getObject(); ByteArrayInputStream inp_stream=new ByteArrayInputStream(buf); ObjectInputStream inp=new ObjectInputStream(inp_stream); Message msg=new Message(); msg.readExternal(inp); Event evt=new Event(Event.MSG, msg); // +++ remove if(log.isDebugEnabled()) log.debug("Message is " + msg + ", headers are " + msg.getHeaders ()); /* Because Protocol.Up() is never called by this bottommost layer, * we call Up() directly in the observer. This allows e.g. * PerfObserver to get the time of reception of a message */ if(observer != null) observer.up(evt, up_queue.size()); passUp(evt); } } catch(javax.jms.JMSException ex) { ex.printStackTrace(); if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString()); } catch(IOException ioex) { ioex.printStackTrace(); if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString()); } catch(ClassNotFoundException cnfex) { cnfex.printStackTrace(); if(log.isErrorEnabled()) log.error("ClassNotFoundException : " + cnfex.toString()); } } /** * Handle down event, if it is not a Event.MSG type. * * @param evt event to handle. */ protected void handleDownEvent(Event evt) { switch(evt.getType()) { // we do not need this at present time, maybe in the future case Event.TMP_VIEW: case Event.VIEW_CHANGE: synchronized(members) { members.removeAllElements(); Vector tmpvec=((View)evt.getArg()).getMembers(); for(int i=0; i < tmpvec.size(); i++) members.addElement(tmpvec.elementAt(i)); } break; case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local) passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); break; case Event.CONNECT: group_addr=(String)evt.getArg(); passUp(new Event(Event.CONNECT_OK)); break; case Event.DISCONNECT: passUp(new Event(Event.DISCONNECT_OK)); break; } } /** * Called by the protocol above this. We check the event type, and if it is * message, we publish it in the topic, otherwise we let the * {@link #handleDownEvent(Event)} take care of it. * * @param evt event to process. */ public void down(Event evt) { if(log.isInfoEnabled()) log.info("event is " + evt + ", group_addr=" + group_addr + ", time=" + System.currentTimeMillis() + ", hdrs are " + Util.printEvent(evt)); // handle all non-message events if(evt.getType() != Event.MSG) { handleDownEvent(evt); return; } // extract message Message msg=(Message)evt.getArg(); // Because we don't call Protocol.passDown(), we notify the observer // directly (e.g. PerfObserver). // This way, we still have performance numbers for UDP if(observer != null) observer.passDown(evt); // publish the message to the topic sendMessage(msg); } /** * Publish message in the JMS topic. We set the message source and * destination addresses if they were <code>null</code>. * * @param msg message to publish. */ protected void sendMessage(Message msg) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -