⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumers.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    }

    /**
     * Return the id of the durable consumer.
     *
     * @param connection - the database connection to use
     * @param name - consumer name
     * @return the consumer identity
     */
    public synchronized long getConsumerId(String name) {
        Consumer map = (Consumer) _consumers.get(name);
        return (map != null) ? map.consumerId : 0;
    }

    /**
     * Return true if a consumer exists
     *
     * @param name - the consumer name
     */
    public synchronized boolean exists(String name) {
        return (_consumers.get(name) != null);
    }

    /**
     * Returns a list of consumer names associated with a topic
     *
     * @param topic - the topic to query
     */
    public synchronized Vector getDurableConsumers(String destination) {
        Vector result = new Vector(); // vector for legacy reasons
        long destinationId = Destinations.instance().getId(destination);
        if (destinationId != 0) {
            Iterator iter = _consumers.values().iterator();
            while (iter.hasNext()) {
                Consumer map = (Consumer) iter.next();
                if (map.destinationId == destinationId) {
                    result.add(map.name);
                }
            }
        }

        return result;
    }

    /**
     * Return a map of consumer names to destinations names.
     *
     * @return HashMap - list of all durable consumers
     */
    public synchronized HashMap getAllDurableConsumers() {
        HashMap result = new HashMap();

        Iterator iter = _consumers.values().iterator();
        while (iter.hasNext()) {
            Consumer map = (Consumer) iter.next();
            JmsDestination dest = Destinations.instance().get(
                map.destinationId);

            if (dest instanceof JmsTopic) {
                result.put(map.name, dest.getName());
            }
        }

        return result;
    }

    /**
     * Return the consumer name corresponding to the specified identity
     *
     * @param id - the consumer identity
     */
    public synchronized String getConsumerName(long id) {
        String name = null;
        Iterator iter = _consumers.values().iterator();

        while (iter.hasNext()) {
            Consumer map = (Consumer) iter.next();
            if (map.consumerId == id) {
                name = map.name;
                break;
            }
        }

        return name;
    }

    /**
     * Deallocates resources owned or referenced by the instance
     */
    public synchronized void close() {
        _consumers.clear();
        _consumers = null;

        _instance = null;
    }

    /**
     * Removes all cached consumer details for a given destination
     *
     * @param       destinationId       the Id of the destination
     */
    protected synchronized void removeCached(long destinationId) {
        Object[] list = _consumers.values().toArray();
        for (int i = 0; i < list.length; i++) {
            Consumer map = (Consumer) list[i];
            if (map.destinationId == destinationId) {
                _consumers.remove(map.name);
            }
        }
    }

    /**
     * Constructor
     */
    private Consumers() {
        _consumers = new HashMap();
    }

    /**
     * Load the cache during init time. It needs to get access to the
     * TransactionService and the DatabaseService so that it can get
     * get access to a transaction and a database connection. This method
     * reads <b>all the consumers</b> into memory.
     * <p>
     * If there is any problem completing this operation then the method
     * will throw a PersistenceException
     *
     * @param connection - the connection to use
     * @throws PersistenceException - if the load fails
     */
    private void load(Connection connection)
        throws PersistenceException {

        PreparedStatement select = null;
        ResultSet set = null;
        try {
            select = connection.prepareStatement("select * from consumers");
            set = select.executeQuery();

            while (set.next()) {
                String name = set.getString("name");
                long consumerId = set.getLong("consumerId");
                long destinationId = set.getLong("destinationId");
                long created = set.getLong("created");
                Consumer map = new Consumer(name, consumerId, destinationId,
                    created);
                _consumers.put(name, map);
            }
        } catch (SQLException exception) {
            throw new PersistenceException("Failed to retrieve consumers",
                exception);
        } finally {
            SQLHelper.close(set);
            SQLHelper.close(select);
        }
    }

    /**
     * Remove all the rows in the specified table with the corresponding
     * consumer identity.
     *
     * @param table - the table to destroy
     * @param consumerId - the target consumerId
     * @param connection - the database connection to use
     * @throws SQLException - thrown on any error
     */
    private void remove(String table, long consumerId, Connection connection)
        throws SQLException {

        PreparedStatement delete = null;
        try {
            delete = connection.prepareStatement(
                "delete from " + table + " where consumerId=?");
            delete.setLong(1, consumerId);
            delete.executeUpdate();
        } finally {
            SQLHelper.close(delete);
        }
    }

    /**
     * Raise a PersistenceException with the specified parameters
     *
     * @param operation - operation that failed
     * @param name - corresponding consumert name
     * @param destination - corresponding destination
     * @param reason - the reason for the exception
     */
    private void raise(String operation, String name, String destination,
                       String reason)
        throws PersistenceException {
        throw new PersistenceException("Cannot " + operation + " consumer=" +
            name + ", destination=" + destination + ": " + reason);
    }

    /**
     * Raise a PersistenceException with the specified parameters
     *
     * @param operation - operation that failed
     * @param name - corresponding consumert name
     * @param reason - the reasone for the exception
     */
    private void raise(String operation, String name, String reason)
        throws PersistenceException {
        throw new PersistenceException("Cannot " + operation + " consumer=" +
            name + ": " + reason);
    }

    /**
     * This is an internal class that is used to store consumer entries
     */
    private class Consumer {

        /**
         * The name of the consumer
         */
        public String name;

        /**
         * The unique consumer identity
         */
        public long consumerId;

        /**
         * The identity of the destination that this durable consumer is
         * subscribed too
         */
        public long destinationId;

        /**
         * The time that this durable consumer was created
         */
        public long created;


        public Consumer(String name, long consumerId, long destinationId,
                        long created) {

            this.name = name;
            this.consumerId = consumerId;
            this.destinationId = destinationId;
            this.created = created;
        }

        public String getKey() {
            return name;
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -