📄 clientsubscription.java
字号:
/** * Reactivates the subscription. * * @param context Re-activation context. * @param reqId Re-activation request identifier. * @param topicId Topic identifier. * @param selector Selector for filtering messages. * @param noLocal <code>true</code> for not consuming messages published * within the same proxy's context. */ void reactivate(int contextId, int reqId, AgentId topicId, String selector, boolean noLocal) { this.contextId = contextId; this.subRequestId = reqId; this.topicId = topicId; this.selector = selector; this.noLocal = noLocal; noFiltering = (! noLocal) && (selector == null || selector.equals("")); active = true; requestId = -1; toListener = false; // Some updated attributes are persistent save(); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": reactivated."); } /** De-activates the subscription, denies the non acknowledgded messages. */ void deactivate() { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "ClientSubscription.deactivate()"); unsetListener(); unsetReceiver(); active = false; // Denying all delivered messages: deny(deliveredIds.keys()); deliveredIds.clear(); // deliveredIds is persistent save(); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": deactivated."); } void setActive(boolean active) { this.active = active; } /** * Sets a listener. * * @param requestId Identifier of the listener request. */ void setListener(int requestId) { this.requestId = requestId; toListener = true; if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": listener set."); } /** Unsets the listener. */ void unsetListener() { requestId = -1; toListener = false; if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": listener unset."); } /** * Sets a receiver request. * * @param requestId Identifier of the "receive" request. * @param timeToLive Request's time to live value. */ void setReceiver(int requestId, long timeToLive) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ".setReceiver(" + requestId + "," + timeToLive + ")"); this.requestId = requestId; toListener = false; if (timeToLive > 0) requestExpTime = System.currentTimeMillis() + timeToLive; else requestExpTime = 0; } /** Unsets a receiver request. */ void unsetReceiver() { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ".unsetReceiver()"); requestId = -1; requestExpTime = 0; } /** Sets the subscription's dead message queue identifier. */ void setDMQId(AgentId dmqId) { this.dmqId = dmqId; save(); } /** Sets the subscription's threshold value. */ void setThreshold(Integer threshold) { this.threshold = threshold; save(); } /** * Browses messages and keeps those which will have to be delivered * to the subscriber. */ void browseNewMessages(Vector newMessages) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ".browseNewMessages(" + newMessages + ')'); // Browsing the messages one by one. Message message; String msgId; for (Enumeration e = newMessages.elements(); e.hasMoreElements();) { message = (Message) e.nextElement(); msgId = message.getIdentifier(); // test nbMaxMsg if (nbMaxMsg > -1 && nbMaxMsg <= messageIds.size()) { ClientMessages deadMessages = new ClientMessages(); deadMessages.addMessage(message); sendToDMQ(deadMessages); continue; } // Keeping the message if filtering is successful. if (noFiltering || (Selector.matches(message, selector) && (! noLocal || ! msgId.startsWith(proxyId.toString().substring(1) + "c" + contextId + "m", 3)))) { // It's the first delivery, adds the message to the proxy's table if (message.acksCounter == 0) messagesTable.put(msgId, message); message.acksCounter++; if (durable) message.durableAcksCounter++; messageIds.add(msgId); save(); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": added msg " + msgId + " for delivery."); } } } /** * Launches a delivery sequence, either for a listener, or for a receiver. */ ConsumerMessages deliver() { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, "ClientSubscription[" + proxyId + ',' + topicId + ',' + name + "].deliver()"); // Returning null if no request exists: if (requestId == -1) return null; // Returning null if a "receive" request has expired: if (! toListener && requestExpTime > 0 && System.currentTimeMillis() >= requestExpTime) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, this + ": receive request " + requestId + " expired."); requestId = -1; requestExpTime = 0; return null; } String id; Message message; Integer deliveryAttempts = null; int lastPrior = -1; int insertionIndex = -1; int prior; Vector deliverables = new Vector(); ClientMessages deadMessages = null; if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, " -> messageIds.size() = " + messageIds.size()); // Delivering to a listener. if (toListener) { // Browsing the identifiers of the messages to deliver. while (! messageIds.isEmpty()) { id = (String) messageIds.remove(0); save(); message = (Message) messagesTable.get(id); // Message still exists. if (message != null) { // Delivering it if valid. if (message.isValid(System.currentTimeMillis())) { deliveredIds.put(id, id); // Setting the message's deliveryCount and denied fields. deliveryAttempts = (Integer) deniedMsgs.get(id); if (deliveryAttempts == null) message.deliveryCount = 1; else { message.deliveryCount = deliveryAttempts.intValue() + 1; message.denied = true; } // Inserting it according to its priority. if (lastPrior == -1 || message.getPriority() == lastPrior) insertionIndex++; else { insertionIndex = 0; while (insertionIndex < deliverables.size()) { prior = ((Message) deliverables.get(insertionIndex)).getPriority(); if (prior >= message.getPriority()) insertionIndex++; else break; } } lastPrior = message.getPriority(); deliverables.insertElementAt(message.clone(), insertionIndex); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": message " + id + " added for delivery."); } // Invalid message: removing and adding it to the vector of dead // messages. else { messagesTable.remove(id); // Deleting the message, if needed. if (durable) message.delete(); // Setting the message's deliveryCount, denied and expired fields. deliveryAttempts = (Integer) deniedMsgs.remove(id); if (deliveryAttempts != null) { message.deliveryCount = deliveryAttempts.intValue(); message.denied = true; } message.expired = true; if (deadMessages == null) deadMessages = new ClientMessages(); deadMessages.addMessage(message); } } // Message has already been deleted. else deniedMsgs.remove(id); } } // Delivering to a receiver: getting the highest priority message. else { int highestP = -1; Message keptMsg = null; // Browsing the non delivered messages. int i = 0; while (i < messageIds.size()) { id = (String) messageIds.elementAt(i); message = (Message) messagesTable.get(id); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> message = " + message); // Message still exists. if (message != null) { // Checking valid message. if (message.isValid(System.currentTimeMillis())) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> valid message"); // Higher priority: keeping the message. if (message.getPriority() > highestP) { highestP = message.getPriority(); keptMsg = message; } // get next message i++; } // Invalid message: removing and adding it to the vector of dead // messages. else { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> invalid message"); messageIds.remove(id); save(); messagesTable.remove(id); // Deleting the message, if needed. if (durable) message.delete(); // Setting the message's deliveryCount, denied and expired fields. deliveryAttempts = (Integer) deniedMsgs.remove(id); if (deliveryAttempts != null) { message.deliveryCount = deliveryAttempts.intValue(); message.denied = true; } message.expired = true; deadMessages = new ClientMessages(); deadMessages.addMessage(message); } } // Message has already been deleted. else { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> deleted message " + id); messageIds.remove(id);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -