⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmstransport.java

📁 JGRoups源码
💻 JAVA
字号:
package org.jgroups.tests.perf.transports;import org.jgroups.tests.perf.Receiver;import org.jgroups.tests.perf.Transport;import javax.jms.*;import javax.naming.InitialContext;import java.util.Properties;import java.util.Map;/** * @author Bela Ban Jan 22 * @author 2004 * @version $Id: JmsTransport.java,v 1.7 2006/04/25 11:55:18 belaban Exp $ */public class JmsTransport implements Transport, MessageListener {    Receiver          receiver=null;    Properties        config=null;    Object            local_addr=null;    ConnectionFactory factory;    InitialContext    ctx;    TopicConnection   conn;    TopicSession      session;    TopicPublisher    pub;    TopicSubscriber   sub;    Topic             topic;    String            topic_name="topic/testTopic";    public JmsTransport() {    }    public Object getLocalAddress() {        return local_addr;    }    public void create(Properties properties) throws Exception {        this.config=properties;        String tmp=config.getProperty("topic");        if(tmp != null)            topic_name=tmp;        ctx=new InitialContext();        factory=(ConnectionFactory)ctx.lookup("ConnectionFactory");        // local_addr=new IpAddress(ucast_sock.getLocalAddress(), ucast_sock.getLocalPort());    }    public void start() throws Exception {        conn=((TopicConnectionFactory)factory).createTopicConnection();        session=conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);        topic=(Topic)ctx.lookup(topic_name);        pub=session.createPublisher(topic);        sub=session.createSubscriber(topic);        sub.setMessageListener(this);        conn.start();        this.local_addr=conn.getClientID();        System.out.println("-- local_addr is " + local_addr);    }    public void stop() {        try {            conn.stop();        }        catch(JMSException e) {            e.printStackTrace();        }    }    public void destroy() {    }    public void setReceiver(Receiver r) {        this.receiver=r;    }    public Map dumpStats() {        return null;    }    public void send(Object destination, byte[] payload) throws Exception {        if(destination != null)            throw new Exception("JmsTransport.send(): unicast destination is not supported");        ObjectMessage msg=session.createObjectMessage(payload);        msg.setObjectProperty("sender", local_addr);        // msg.setObjectProperty("size", new Integer(payload.length));        //todo: write the sender (maybe use ObjectMessage instead of BytesMessage)        // msg.writeInt(payload.length);        // msg.writeBytes(payload, 0, payload.length);        pub.publish(topic, msg);    }    public void onMessage(Message message) {        Object sender=null;        if(message == null || !(message instanceof ObjectMessage)) {            System.err.println("JmsTransport.onMessage(): received a non ObjectMessage (" + message + "), discarding");            return;        }        ObjectMessage tmp=(ObjectMessage)message;        try {          //  todo: read the sender            sender=tmp.getObjectProperty("sender");            // int len=tmp.readInt();            // int len=((Integer)tmp.getObjectProperty("size")).intValue();            byte[] payload=(byte[])tmp.getObject();            if(receiver != null)                receiver.receive(sender, payload);        }        catch(JMSException e) {            e.printStackTrace();        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -