📄 jms.java
字号:
try { if (msg.getSrc() == null) msg.setSrc(local_addr); if (msg.getDest() == null) msg.setDest(mcast_addr); if(log.isInfoEnabled()) log.info("msg is " + msg); // convert the message into byte array. out_stream.reset(); ObjectOutputStream out=new ObjectOutputStream(out_stream); msg.writeExternal(out); out.flush(); byte[] buf = out_stream.toByteArray(); javax.jms.ObjectMessage jmsMessage = session.createObjectMessage(); // set the payload jmsMessage.setObject(buf); // set the group name jmsMessage.setStringProperty(GROUP_NAME_PROPERTY, group_addr); // if the source is JMSAddress, copy it to the header if (msg.getSrc() instanceof JMSAddress) jmsMessage.setStringProperty( SRC_PROPERTY, msg.getSrc().toString()); // if the destination is JMSAddress, copy it to the header if (msg.getDest() instanceof JMSAddress) jmsMessage.setStringProperty( DEST_PROPERTY, msg.getDest().toString()); // publish message publisher.publish(jmsMessage); } catch(javax.jms.JMSException ex) { if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString()); } catch(IOException ioex) { if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString()); } } /** * Start the JMS protocol. This method instantiates the JNDI initial context * and looks up the topic connection factory and topic itself. If this step * is successful, it creates a connection to JMS server, opens a session * and obtains publisher and subscriber instances. * * @throws javax.jms.JMSException if something goes wrong with JMS. * @throws javax.naming.NamingException if something goes wrong with JNDI. * @throws IllegalArgumentException if the connection factory or topic * cannot be found under specified names. */ public void start() throws Exception { if (initCtxFactory != null && providerUrl != null) { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, initCtxFactory); env.put(Context.PROVIDER_URL, providerUrl); ctx = new InitialContext(env); } else ctx = new InitialContext(); connectionFactory = (javax.jms.TopicConnectionFactory)ctx.lookup(cfName); if (connectionFactory == null) throw new IllegalArgumentException( "Topic connection factory cannot be found in JNDI."); topic = (javax.jms.Topic)ctx.lookup(topicName); if (topic == null) throw new IllegalArgumentException("Topic cannot be found in JNDI."); connection = connectionFactory.createTopicConnection(); boolean addressAssigned = false; // check if JMS connection contains client ID, // if not, try to assign randomly generated one /*while(!addressAssigned) { if (connection.getClientID() != null) addressAssigned = true; else try { connection.setClientID(generateLocalAddress()); addressAssigned = true; } catch(javax.jms.InvalidClientIDException ex) { // duplicate... ok, let's try again } }*/ // Patch below submitted by Greg Woolsey // Check if JMS connection contains client ID, if not, try to assign randomly generated one // setClientID() must be the first method called on a new connection, per the JMS spec. // If the client ID is already set, this will throw IllegalStateException and keep the original value. while(!addressAssigned) { try { connection.setClientID(generateLocalAddress()); addressAssigned = true; } catch (javax.jms.IllegalStateException e) { // expected if connection already has a client ID. addressAssigned = true; } catch(javax.jms.InvalidClientIDException ex) { // duplicate... OK, let's try again } } local_addr = new JMSAddress(connection.getClientID(), false); mcast_addr = new JMSAddress(topicName, true); session = connection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); publisher = session.createPublisher(topic); publisher.setTimeToLive(timeToLive); subscriber = session.createSubscriber(topic); subscriber.setMessageListener(this); connection.start(); passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); } /** * Stops the work of the JMS protocol. This method closes JMS session and * connection and deregisters itself from the message notification. */ public void stop() { if(log.isInfoEnabled()) log.info("finishing JMS transport layer."); try { connection.stop(); subscriber.setMessageListener(null); session.close(); connection.close(); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception is " + ex); } } /** * Generate random local address. This method takes host name and appends * it with randomly generated integer. * * @return randomly generated local address. */ protected String generateLocalAddress() throws java.net.UnknownHostException { String hostName = java.net.InetAddress.getLocalHost().getHostName(); int rndPort = RND.nextInt(65535); return hostName + ':' + rndPort; } /** * Simple {@link Address} representing the JMS node ID or JMS topic group. */ public static class JMSAddress implements Address { private String address; private boolean isMCast; /** * Empty constructor to allow externalization work. */ public JMSAddress() { } /** * Create instance of this class for given address string. * * Current implementation uses a hash mark <code>'#'</code> to determine * if the address is a unicast or multicast. Therefore, this character is * considered as reserved and is not allowed in the <code>address</code> * parameter passed to this constructor. * * @param address string representing the address of the node connected * to the JMS topic, usually, a value of * <code>connection.getClientID()</code>, where the connection is * instance of <code>javax.jms.TopicConnection</code>. * * @param isMCast <code>true</code> if the address is multicast address, * otherwise - <code>false</code>. */ JMSAddress(String address, boolean isMCast) { this.address = address; this.isMCast = isMCast; } /** * Reconstruct the address from the string representation. If the * <code>str</code> starts with <code>'#'</code>, address is considered * as unicast, and node address is the substring after <code>'#'</code>. * Otherwise, address is multicast and address is <code>str</code> * itself. * * @param str string used to reconstruct the instance. */ JMSAddress(String str) { if (str.startsWith("#")) { address = str.substring(1); isMCast = false; } else { address = str; isMCast = true; } } /** * Get the node address. * * @return node address in the form passed to the constructor * {@link #JMS.JMSAddress(String, boolean)}. */ public String getAddress() { return address; } /** * Set the node address. * * @param address new node address. */ public void setAddress(String address) { this.address = address; } /** * Is the address a multicast address? * * @return <code>true</code> if the address is multicast address. */ public boolean isMulticastAddress() { return isMCast; } public int size() { return 22; } /** * Clone the object. */ protected Object clone() throws CloneNotSupportedException { return new JMSAddress(address, isMCast); } /** * Compare this object to <code>o</code>. It is possible to compare only * addresses of the same class. Also they both should be either * multicast or unicast addresses. * * @return value compliant with the {@link Comparable#compareTo(Object)} * specififaction. */ public int compareTo(Object o) throws ClassCastException { if (!(o instanceof JMSAddress)) throw new ClassCastException("Cannot compare different classes."); JMSAddress that = (JMSAddress)o; if (that.isMCast != this.isMCast) throw new ClassCastException( "Addresses are different: one is multicast, and one is not"); return this.address.compareTo(that.address); } /** * Test is this object is equal to <code>obj</code>. * * @return <code>true</code> iff the <code>obj</code> is * <code>JMSAddress</code>, node addresses are equal and they both are * either multicast or unicast addresses. */ public boolean equals(Object obj) { if (obj == this) return true; if (!(obj instanceof JMSAddress)) return false; JMSAddress that = (JMSAddress)obj; if (this.isMCast) return this.isMCast == that.isMCast; else if (this.address == null || that.address == null) return false; else return this.address.equals(that.address) && this.isMCast == that.isMCast; } /** * Get the hash code of this address. * * @return hash code of this object. */ public int hashCode() { return toString().hashCode(); } /** * Read object from external input. */ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { address = (String)in.readObject(); isMCast = in.readBoolean(); } /** * Get the string representation of the address. The following property * holds: <code>a2.equals(a1)</code> is always <code>true</code>, where * <code>a2</code> is * <code>JMSAddress a2 = new JMSAddress(a1.toString());</code> * * @return string representation of the address. */ public String toString() { return !isMCast ? '#' + address : address; } /** * Write the object to external output. */ public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(address); out.writeBoolean(isMCast); } public void writeTo(DataOutputStream outstream) throws IOException { outstream.writeUTF(address); outstream.writeBoolean(isMCast); } public void readFrom(DataInputStream instream) throws IOException, IllegalAccessException, InstantiationException { address=instream.readUTF(); isMCast=instream.readBoolean(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -