📄 clientsubscription.java
字号:
deniedMsgs.remove(id); save(); } } // Putting the kept message in the vector. if (keptMsg != null) { messageIds.remove(keptMsg.getIdentifier()); deliveredIds.put(keptMsg.getIdentifier(), keptMsg.getIdentifier()); save(); // Setting the message's deliveryCount and denied fields. deliveryAttempts = (Integer) deniedMsgs.get(keptMsg.getIdentifier()); if (deliveryAttempts == null) keptMsg.deliveryCount = 1; else { keptMsg.deliveryCount = deliveryAttempts.intValue() + 1; keptMsg.denied = true; } deliverables.add(keptMsg.clone()); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": message " + keptMsg.getIdentifier() + " added for delivery."); } else { i++; } } // Sending the dead messages to the DMQ, if any: if (deadMessages != null) sendToDMQ(deadMessages); // Finally, returning the reply or null: if (! deliverables.isEmpty()) { ConsumerMessages consM = new ConsumerMessages(requestId, deliverables, name, false); if (! toListener) requestId = -1; return consM; } return null; } /** * Acknowledges messages. */ void acknowledge(Enumeration acks) { while (acks.hasMoreElements()) { String id = (String) acks.nextElement(); acknowledge(id); } } void acknowledge(String id) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": acknowledges message: " + id); deliveredIds.remove(id); deniedMsgs.remove(id); save(); Message msg = (Message) messagesTable.get(id); // Message may be null if it is not valid anymore if (msg != null) { msg.acksCounter--; if (msg.acksCounter == 0) messagesTable.remove(id); if (durable) { msg.durableAcksCounter--; if (msg.durableAcksCounter == 0) msg.delete(); } } } /** * Denies messages. */ void deny(Enumeration denies) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ".deny(" + denies + ')'); String id; Message msg; ClientMessages deadMessages = null; int deliveryAttempts = 1; int i; String currentId; long currentO; denyLoop: while (denies.hasMoreElements()) { id = (String) denies.nextElement(); String deliveredMsgId = (String)deliveredIds.remove(id); if (deliveredMsgId == null) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": cannot denies message: " + id); continue denyLoop; } save(); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": denies message: " + id); msg = (Message) messagesTable.get(id); // Message may be null if it is not valid anymore if (msg == null) continue denyLoop; Integer value = (Integer) deniedMsgs.get(id); if (value != null) deliveryAttempts = value.intValue() + 1; // If maximum delivery attempts reached, the message is no more // deliverable to this sbscriber. if (isUndeliverable(deliveryAttempts)) { deniedMsgs.remove(id); msg.deliveryCount = deliveryAttempts; msg.undeliverable = true; if (deadMessages == null) deadMessages = new ClientMessages(); deadMessages.addMessage(msg); msg.acksCounter--; if (msg.acksCounter == 0) messagesTable.remove(id); if (durable) { msg.durableAcksCounter--; if (msg.durableAcksCounter == 0) msg.delete(); } } // Else, putting it back to the deliverables vector according to its // original delivery order, and adding a new entry for it in the // denied messages table. else { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, " -> put back to the messages to deliver"); i = 0; insertLoop: while (i < messageIds.size()) { currentId = (String) messageIds.elementAt(i); Message currentMessage = (Message) messagesTable.get(currentId); // Message may be null if it is not valid anymore if (currentMessage != null) { currentO = currentMessage.order; if (currentO > msg.order) { break insertLoop; } else { i++; } } else { // Remove the invalid message messageIds.removeElementAt(i); } } messageIds.insertElementAt(id, i); deniedMsgs.put(id, new Integer(deliveryAttempts)); } } // Sending dead messages to the DMQ, if needed: if (deadMessages != null) sendToDMQ(deadMessages); } /** * Decreases the subscription's messages acknowledgement expectations, * deletes those not to be consumed anymore. */ void delete() { for (Enumeration e = deliveredIds.keys(); e.hasMoreElements();) messageIds.add(e.nextElement()); save(); String id; Message msg; for (Enumeration allMessageIds = messageIds.elements(); allMessageIds.hasMoreElements();) { id = (String) allMessageIds.nextElement(); msg = (Message) messagesTable.get(id); if (msg != null) { msg.acksCounter--; if (msg.acksCounter == 0) messagesTable.remove(id); if (durable) { msg.durableAcksCounter--; if (msg.durableAcksCounter == 0) msg.delete(); } } } } /** * Returns <code>true</code> if a given value matches the threshold value * for this user. */ private boolean isUndeliverable(int deliveryAttempts) { if (threshold != null) return deliveryAttempts == threshold.intValue(); else if (DeadMQueueImpl.getDefaultThreshold() != null) return deliveryAttempts == DeadMQueueImpl.getDefaultThreshold().intValue(); return false; } /** * Method used for sending messages to the appropriate dead message queue. */ private void sendToDMQ(ClientMessages messages) { if (dmqId != null) Channel.sendTo(dmqId, messages); else if (DeadMQueueImpl.getId() != null) Channel.sendTo(DeadMQueueImpl.getId(), messages); } Message getMessage(String msgId) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, "ClientSubscription.getMessage(" + msgId + ')'); int index = messageIds.indexOf(msgId); if (index < 0) { // The message has been delivered if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> message not found"); return null; } else { return (Message) messagesTable.get(msgId); } } void deleteMessage(String msgId) { messageIds.remove(msgId); save(); Message msg = removeMessage(msgId); if (msg != null) { ClientMessages deadMessages = new ClientMessages(); deadMessages.addMessage(msg); sendToDMQ(deadMessages); } } void clear() { ClientMessages deadMessages = null; for (int i = 0; i < messageIds.size(); i++) { String msgId = (String)messageIds.elementAt(i); Message msg = removeMessage(msgId); if (msg != null) { if (deadMessages == null) { deadMessages = new ClientMessages(); } deadMessages.addMessage(msg); } } if (deadMessages != null) { sendToDMQ(deadMessages); } messageIds.clear(); save(); } /** * Removes a particular pending message in the subscription. * The message is pointed out through its unique identifier. * * @param msgId The unique message's identifier. */ Message removeMessage(String msgId) { Message msg = (Message) messagesTable.get(msgId); if (msg != null) { msg.acksCounter--; if (msg.acksCounter == 0) messagesTable.remove(msgId); if (durable) { msg.durableAcksCounter--; if (msg.durableAcksCounter == 0) msg.delete(); } } return msg; } private void save() { if (durable) proxy.setSave(); } public void readBag(ObjectInputStream in) throws IOException, ClassNotFoundException { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, "ClientSubscription[" + proxyId + "].readbag()"); contextId = in.readInt(); subRequestId = in.readInt(); noLocal = in.readBoolean(); noFiltering = in.readBoolean(); active = in.readBoolean(); requestId = in.readInt(); toListener = in.readBoolean(); requestExpTime = in.readLong(); } public void writeBag(ObjectOutputStream out) throws IOException { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, "ClientSubscription[" + proxyId + "].writeBag()"); out.writeInt(contextId); out.writeInt(subRequestId); out.writeBoolean(noLocal); out.writeBoolean(noFiltering); out.writeBoolean(active); out.writeInt(requestId); out.writeBoolean(toListener); out.writeLong(requestExpTime); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -