⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsserversession.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    public void acknowledgeMessage(long clientId, String id)
        throws JMSException {
        _sentMessageCache.acknowledgeMessage(new MessageId(id), clientId);
    }

    /**
     * Send the specified message to the server
     *
     * @param message the message to send
     * @throws JMSException if the message can't be sent
     */
    public void sendMessage(Message message) throws JMSException {
        if (message == null) {
            throw new JMSException("Message is null");
        }

        try {
            // check the delivery mode of the message
            checkDeliveryMode((MessageImpl) message);

            // set the connection identity and then let the message meanager
            // process it
            ((MessageImpl) message).setConnectionId(_connection.hashCode());

            // if there is a global transaction currently in process then
            // we must send the message to the resource manager, otherwise
            // send it directly to the message manager
            if (_xid != null) {
                ResourceManager.instance().logPublishedMessage(_xid,
                    (MessageImpl) message);
            } else {
                MessageMgr.instance().add((MessageImpl) message);
                _publishCount++;
            }
        } catch (JMSException exception) {
            _log.error("Failed to process message", exception);
            throw exception;
        } catch (OutOfMemoryError exception) {
            String msg =
                "Failed to process message due to out-of-memory error";
            _log.error(msg, exception);
            throw new JMSException(msg);
        } catch (Exception exception) {
            String msg = "Failed to process message";
            _log.error(msg, exception);
            throw new JMSException(msg);
        }
    }

    /**
     * Send the specified messages to the server.
     *
     * @param messages the messages to send
     * @throws JMSException if the messages can't be sent
     */
    public void sendMessages(Vector messages) throws JMSException {
        if (messages == null) {
            throw new JMSException("No messages to send");
        }

        MessageImpl message = null;
        while ((messages.size() > 0) &&
            ((message = (MessageImpl) messages.remove(0)) != null)) {
            try {
                // check the delivery mode of the message
                checkDeliveryMode((MessageImpl) message);

                // set the connection identity and then let the message manager
                // process it
                message.setConnectionId(_connection.hashCode());

                // if there is a global transaction in progress then send the
                // message to the resource manager, otherwise send it to the
                // message manager
                if (_xid != null) {
                    ResourceManager.instance().logPublishedMessage(_xid,
                        message);
                } else {
                    MessageMgr.instance().add(message);
                    _publishCount++;
                }
            } catch (JMSException exception) {
                _log.error("Failed to process message", exception);
                throw exception;
            } catch (OutOfMemoryError exception) {
                String msg =
                    "Failed to process message due to out-of-memory error";
                _log.error(msg, exception);
                throw new JMSException(msg);
            } catch (Exception exception) {
                String msg = "Failed to process messages";
                _log.error(msg, exception);
                throw new JMSException(msg);
            }
        }
    }

    /**
     * Return the next message for the specified client. The <code>wait</code>
     * parameter indicates how long many milliseconds to wait for a message
     * before returning. If <code>wait</code> is 0 then do not wait at all. If
     * <code>wait</code> is -1 then wait indefinitely for the next message
     *
     * @param clientId the client identity
     * @param wait number of ms to wait
     * @return the next message or <code>null</code> if there is no message
     * @throws JMSException if the message can't be received
     */
    public Message receiveMessage(long clientId, long wait)
        throws JMSException {
        MessageImpl message = null;
        ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
        if (consumer == null) {
            throw new JMSException(
                "Can't receive message: no consumer registered with "
                + "identifier " + clientId + " on session " + _sessionId);
        }

        // we have a valid consumer, now we need retrieve a handle.
        MessageHandle handle = consumer.receiveMessage(wait);
        if (handle != null) {
            // if we get a non-null handle the retrieve the message,
            // clone
            MessageImpl orig = handle.getMessage();
            if (orig != null) {
                try {
                    message = (MessageImpl) orig.clone();
                    message.setJMSRedelivered(handle.getDelivered());
                    message.setClientId(handle.getClientId());
                    _consumeCount++;
                } catch (Exception exception) {
                    _log.error(exception);
                    message = null;
                }
            }
        }

        // if we have a non-null message then add it to the sent message
        // cache. Additionally, if we are part of a global transaction then
        // we must also sent it to the ResourceManager for recovery.
        if (handle != null) {
            _sentMessageCache.process(handle);

            if (_xid != null) {
                try {
                    ResourceManager.instance().logReceivedMessage(
                        _xid, consumer.getId(), handle);
                } catch (Exception exception) {
                    _log.error(exception);
                    JMSException jms_exception = new JMSException(
                        "Error in receiveMessage");
                    jms_exception.setLinkedException(exception);
                    throw jms_exception;
                }
            }
        }

        return message;
    }

    /**
     * Return up to count messages from the endpoint with the specified
     * client identity. The client must be a QueueBrowser.
     *
     * @param       clientId            the client identity
     * @param       count               the maximum number of messages retrieve
     * @return      Message             the next message or null
     * @throws JMSException if the endpoint does not exist, or is not a
     * {@link QueueBrowserEndpoint}
     */
    public Vector receiveMessages(long clientId, int count)
        throws JMSException {

        ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
        if (consumer == null) {
            throw new JMSException(
                "Can't receive messages: no consumer registered with "
                + "identifier " + clientId + " on session " + _sessionId);
        }

        if (!(consumer instanceof QueueBrowserEndpoint)) {
            throw new JMSException(
                "Can't receive messages: consumer with identifier "
                + "identifier " + clientId + " is not a QueueBrowser");
        }

        // we have a valid consumer, now we need retrieve upto count
        // handles
        Vector handles = ((QueueBrowserEndpoint) consumer).receiveMessages(
            count);
        Vector messages = new Vector();
        if (handles.size() > 0) {

            // process the handles
            int max = handles.size();
            for (int index = 0; index < max; index++) {
                MessageHandle handle = (MessageHandle) handles.elementAt(index);
                MessageImpl orig = handle.getMessage();
                MessageImpl message = null;
                if (orig != null) {
                    try {
                        message = (MessageImpl) orig.clone();
                        message.setJMSRedelivered(handle.getDelivered());
                        message.setClientId(handle.getClientId());
                        messages.addElement(message);
                    } catch (Exception exception) {
                        _log.error(exception);
                        message = null;
                    }
                }
            }
        }

        return messages;
    }

    /**
     * Create an amdinistered queue, through the message manager admin
     * interface.
     *
     * @param queue administered queue to create
     * @throws JMSException if the queue can't be created
     */
    public void createQueue(JmsQueue queue) throws JMSException {
        if (!DestinationManager.instance().createAdministeredDestination(
            queue)) {
            throw new JMSException("Failed to create queue: " +
                queue.getName());
        }
    }

    /**
     * Create an administered topic, through the message manager admin
     * interface.
     *
     * @param topic administered topic to create
     * @throws JMSException if the topic can't be created
     */
    public void createTopic(JmsTopic topic) throws JMSException {
        if (!DestinationManager.instance().createAdministeredDestination(
            topic)) {
            throw new JMSException("Failed to create topic: " +
                topic.getName());
        }
    }

    /**
     * Create a receiver endpoint for this session. A receiver is a message
     * consumer specific to the queue message model. The receiver is
     * associated with a queue.
     * <p>
     * You cannot create more than one receiver for the same destination
     *
     * @param queue the receiver destination
     * @param consumerId the client session allocated identifier of the
     * consumer
     * @param selector the selector to filter messages. May be
     * <code>null</code>
     * @throws JMSException if the receiver can't be created
     */
    public void createReceiver(JmsQueue queue, long clientId, String selector)
        throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createReceiver(queue=" + queue + ", clientId="
                + clientId + ", selector=" + selector
                + ") [sessionId=" + _sessionId + "]");
        }

        if (queue == null) {
            throw new JMSException("Cannot create receiver for null queue");
        }

        // Retrieve the destination from the destination manager and use
        // it to create the consumer
        ConsumerEndpoint consumer =
            ConsumerManager.instance().createConsumerEndpoint(this,
                clientId, queue, selector);
        consumer.setAckMode(_ackMode);
        consumer.setConnectionId(_connection.hashCode());
        consumer.setTransacted(_transacted);
        // if the session is stopped then we should also stop the
        // consumer, so that it doesn't deliver messages and then
        // cache it for future reference.
        consumer.setStopped(_stopped);
        _consumers.put(Long.toString(clientId), consumer);
    }

    /**
     * This is a no-op
     */
    public void createSender(JmsQueue queue) throws JMSException {
    }

    /**
     * Create a queue browser for this session. This allows clients to browse
     * a queue without removing any messages.
     * <p>
     *
     * You cannot create more than one queue browser for the same queue
     * in a single session.
     *
     * @param       queue               queue to browse
     * @param       clientId            the client identity
     * @param       selector            message selector. This may be null
     * @throws   JMSException.
     */
    public void createBrowser(JmsQueue queue, long clientId, String selector)
        throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createBrowser(queue=" + queue + ", clientId="
                + clientId + ", selector=" + selector
                + ") [sessionId=" + _sessionId + "]");
        }

        // check to see that we have a valid queue
        if (queue == null) {
            throw new JMSException("Cannot create browser for null queue");
        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -