⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributor.java

📁 piweurrrrq i o fhsadhfka fd dskajc zxkjcnkjsahc
💻 JAVA
字号:
/* * Copyright (c) 2000 Lyrisoft Solutions, Inc. * Used by permission */package com.lyrisoft.chat.server.remote;import java.util.Properties;import java.util.Hashtable;import java.util.List;import java.util.LinkedList;import java.util.Collections;import javax.naming.*;import javax.jms.*;import com.lyrisoft.util.properties.*;/** * Handles sending and receiving messages over JMS * * TODO: connect should create an initial context and try to connect, and rotate */public class Distributor implements MessageListener, Runnable {    protected ChatServer _server;    // queue for outbound messages    protected List _queue = Collections.synchronizedList(new LinkedList());    protected Hashtable _env;    // our round-robin choices    protected String[] _providerUrls;    protected int _providerUrl = 0;    protected Context _jndiContext;    protected String _jmsUser;    protected String _jmsPass;    protected String _topicName;        protected TopicPublisher _topicPublisher;    protected TopicSession _topicSession;    protected boolean _keepGoing = true;    protected boolean _connected = false;    private int _failedAttempts = 0; // how many attempts have failed so far    public Distributor(ChatServer server, Properties p) throws PropertyException {        _server = server;        _topicName = PropertyTool.getString("jms.topic", p);        String jndiInitialContextFactory = PropertyTool.getString("jndi.initial", p);        _providerUrls = PropertyTool.getStringArray("jms.provider", p);        _jmsUser = p.getProperty("jms.user");        _jmsPass = p.getProperty("jms.pass");        _env = new Hashtable();        _env.put(Context.INITIAL_CONTEXT_FACTORY, jndiInitialContextFactory);    }    synchronized void createNewJndiContext() throws NamingException {        _env.put (Context.PROVIDER_URL, getProvider());        _jndiContext = new InitialContext(_env);    }    synchronized void rotateProviders() {        _providerUrl++;        if (_providerUrl >= _providerUrls.length) {            _providerUrl = 0;        }    }        synchronized String getProvider() {        return _providerUrls[_providerUrl];    }	public Object jndiLookup(String name) throws NamingException {        if (_jndiContext == null) {            createNewJndiContext();        }        if (_failedAttempts == 0) {            ChatServer.log("doing JNDI lookup of '" + name + "'");        }        Object obj = _jndiContext.lookup(name);        return obj;	}    synchronized boolean isConnected() {        return _connected;    }    synchronized void connect() throws JMSException, NamingException {        if (_failedAttempts == 0) {            ChatServer.log("Attempting to connect to JMS provider " +                            _providerUrls[_providerUrl]);        }        TopicConnectionFactory topicConnectionFactory =                 (TopicConnectionFactory)jndiLookup("TopicConnectionFactory");        TopicConnection topicConnection = null;        if (_jmsUser == null) {            topicConnection = topicConnectionFactory.createTopicConnection();        } else {            topicConnection = topicConnectionFactory.createTopicConnection(_jmsUser, _jmsPass);        }        Topic topic = (Topic)jndiLookup(_topicName);        _topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);        // subscriber        TopicSubscriber topicSubscriber = _topicSession.createSubscriber(topic, null, true);        topicSubscriber.setMessageListener(this);        topicConnection.start(); // start the subscriber thread        ChatServer.log("JMS Subscriber started.");        // publisher        _topicPublisher = _topicSession.createPublisher(topic);        _connected = true;        ChatServer.log("JMS Publisher initialized.");        _server.distributorConnected(this);    }    public void run() {        try {            while (_keepGoing) {                if (_connected) {                    if (_queue.size() > 0) {                        flushQueue();                    }                    Thread.sleep(25);                } else {                    try {                        connect();                        _failedAttempts = 0;                    }                    catch (JMSException e) {                        if (_failedAttempts++ == 0) {                            ChatServer.logError("Could not make JMS connection to " +                                                 getProvider() +                                                 " [" + e.getMessage() + "]");                            ChatServer.logError("There will be no more JMS related log messages until I get a connection....");                        }                        rotateProviders();                        Thread.sleep(30000);                    }                    catch (NamingException e) {                        if (_failedAttempts++ == 0) {                            ChatServer.logError("JNDI lookup failed.");                            ChatServer.logError("There will be no more JMS related log messages until I get a connection....");                        }                        rotateProviders();                        try {                            createNewJndiContext();                        }                        catch (NamingException ex) {                            if (_failedAttempts++ == 0) {                                ChatServer.logError("Could not create initial JNDI context.");                                ChatServer.logError("There will be no more JMS related log messages until I get a connection....");                            }                        }                        Thread.sleep(30000);                    }                }            }        }        catch (InterruptedException e) {            ChatServer.log(e);        }    }    void flushQueue() {        while (_queue.size() > 0) {            javax.jms.Message message = (javax.jms.Message)_queue.remove(0);            try {                if (ChatServer.DEBUG) {                    try {                        ChatServer.DEBUG("<< " + ((TextMessage)message).getText());                    }                    catch (JMSException e) {                        e.printStackTrace();                    }                }                _topicPublisher.publish(message);            }            catch (JMSException e) {                ChatServer.log(e);                _connected = false;                _server.distributorDisconnected(this);            }        }    }    TextMessage createTextMessage() throws JMSException {        try {            return _topicSession.createTextMessage();        }        catch (JMSException e) {            _connected = false;            _server.distributorDisconnected(this);            throw e;        }    }    public void push(javax.jms.Message msg) {        _queue.add(msg);    }        public void onMessage(javax.jms.Message message) {        if (ChatServer.DEBUG) {            try {                ChatServer.DEBUG(">> " + ((TextMessage)message).getText());                ChatServer.DEBUG(">>     origin = " + message.getStringProperty("origin"));                ChatServer.DEBUG(">>     client = " + message.getStringProperty("client"));            }            catch (JMSException e) {}        }        _server.handleIncoming(message);    }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -