📄 queueimpl.java
字号:
} /** * Method specifically processing a <code>SetRightRequest</code> instance. * <p> * When a reader is removed, and receive requests of this reader are still * on the queue, they are replied to by an <code>ExceptionReply</code>. */ protected void doProcess(SetRightRequest not) { // If the request does not unset a reader, doing nothing. if (not.getRight() != -READ) return; AgentId user = not.getClient(); ReceiveRequest request; AccessException exc; ExceptionReply reply; // Free reading right has been removed; replying to the non readers // requests. if (user == null) { for (int i = 0; i < requests.size(); i++) { request = (ReceiveRequest) requests.get(i); if (! isReader(request.requester)) { exc = new AccessException("Free READ access removed"); reply = new ExceptionReply(request, exc); Channel.sendTo(request.requester, reply); // state change, so save. setSave(); requests.remove(i); i--; } } } // Reading right of a given user has been removed; replying to its // requests. else { for (int i = 0; i < requests.size(); i++) { request = (ReceiveRequest) requests.get(i); if (user.equals(request.requester)) { exc = new AccessException("READ right removed"); reply = new ExceptionReply(request, exc); Channel.sendTo(request.requester, reply); // state change, so save. setSave(); requests.remove(i); i--; } } } } /** * Method specifically processing a <code>ClientMessages</code> instance. * <p> * This method stores the messages and launches a delivery sequence. */ protected void doProcess(ClientMessages not) { receiving = true; Message msg; // Storing each received message: for (Enumeration msgs = not.getMessages().elements(); msgs.hasMoreElements();) { if (arrivalsCounter == Long.MAX_VALUE) arrivalsCounter = 0; msg = (Message) msgs.nextElement(); if (not.isPersistent()) { // state change, so save. setSave(); } msg.order = arrivalsCounter++; storeMessage(msg); } // Lauching a delivery sequence: deliverMessages(0); receiving = false; } /** * Method specifically processing an <code>UnknownAgent</code> instance. * <p> * The specific processing is done when a <code>QueueMsgReply</code> was * sent to a requester which does not exist anymore. In that case, the * messages sent to this requester and not yet acknowledged are marked as * "denied" for delivery to an other requester, and a new delivery sequence * is launched. Messages considered as undeliverable are removed and sent to * the DMQ. */ protected void doProcess(UnknownAgent uA) { AgentId client = uA.agent; Notification not = uA.not; // If the notification is not a delivery, doing nothing. if (! (not instanceof QueueMsgReply)) return; String msgId; Message msg; AgentId consId; ClientMessages deadMessages = null; for (Enumeration e = deliveredMsgs.keys(); e.hasMoreElements();) { msgId = (String) e.nextElement(); msg = (Message) deliveredMsgs.get(msgId); consId = (AgentId) consumers.get(msgId); // Delivered message has been delivered to the deleted client: // denying it. if (consId.equals(client)) { deliveredMsgs.remove(msgId); msg.denied = true; // state change, so save. setSave(); consumers.remove(msgId); contexts.remove(msgId); // If message considered as undeliverable, adding it to the // vector of dead messages: if (isUndeliverable(msg)) { msg.delete(); msg.undeliverable = true; if (deadMessages == null) deadMessages = new ClientMessages(); deadMessages.addMessage(msg); } // Else, putting it back into the deliverables vector: else storeMessage(msg); if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, "Message " + msg.getIdentifier() + " denied."); } } // Sending dead messages to the DMQ, if needed: if (deadMessages != null) sendToDMQ(deadMessages, null); // Launching a delivery sequence: deliverMessages(0); } /** * Method specifically processing a * <code>fr.dyade.aaa.agent.DeleteNot</code> instance. * <p> * <code>ExceptionReply</code> replies are sent to the pending receivers, * and the remaining messages are sent to the DMQ and deleted. */ protected void doProcess(DeleteNot not) { // Building the exception to send to the pending receivers: DestinationException exc = new DestinationException("Queue " + destId + " is deleted."); ReceiveRequest rec; ExceptionReply excRep; // Sending it to the pending receivers: cleanWaitingRequest(System.currentTimeMillis()); for (int i = 0; i < requests.size(); i++) { rec = (ReceiveRequest) requests.elementAt(i); excRep = new ExceptionReply(rec, exc); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Requester " + rec.requester + " notified of the queue deletion."); Channel.sendTo(rec.requester, excRep); } // Sending the remaining messages to the DMQ, if needed: if (! messages.isEmpty()) { Message msg; ClientMessages deadMessages = new ClientMessages(); while (! messages.isEmpty()) { msg = (Message) messages.remove(0); msg.deletedDest = true; deadMessages.addMessage(msg); } sendToDMQ(deadMessages, null); } // Deleting the messages: MessagePersistenceModule.deleteAll(getDestinationId()); } /** * Actually stores a message in the deliverables vector. * * @param message The message to store. */ protected final synchronized void storeMessage(Message message) { addMessage(message); // Persisting the message. message.save(getDestinationId()); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Message " + message.getIdentifier() + " stored."); } protected final synchronized void addMessage(Message message) { nbMsgsReceiveSinceCreation++; if (nbMaxMsg > -1 && nbMaxMsg <= messages.size()) { ClientMessages deadMessages = new ClientMessages(); deadMessages.addMessage(message); sendToDMQ(deadMessages, null); return; } if (messages.isEmpty()) { samePriorities = true; priority = message.getPriority(); } else if (samePriorities && priority != message.getPriority()) { samePriorities = false; } if (samePriorities) { // Constant priorities: no need to insert the message according to // its priority. if (receiving) { // Message being received: adding it at the end of the queue. messages.add(message); } else { // Denying or recovery: adding the message according to its original // arrival order. long currentO; int i = 0; for (Enumeration e = messages.elements(); e.hasMoreElements();) { currentO = ((Message) e.nextElement()).order; if (currentO > message.order) break; i++; } messages.insertElementAt(message, i); } } else { // Non constant priorities: inserting the message according to its // priority. Message currentMsg; int currentP; long currentO; int i = 0; for (Enumeration e = messages.elements(); e.hasMoreElements();) { currentMsg = (Message) e.nextElement(); currentP = currentMsg.getPriority(); currentO = currentMsg.order; if (! receiving && currentP == message.getPriority()) { // Message denied or recovered, priorities are equal: inserting the // message according to its original arrival order. if (currentO > message.order) break; } else if (currentP < message.getPriority()) { // Current priority lower than the message to store: inserting it. break; } i++; } messages.insertElementAt(message, i); } } /** * Actually tries to answer the pending "receive" requests. * <p> * The method may send <code>QueueMsgReply</code> replies to clients. * * @param index Index where starting to "browse" the requests. */ protected void deliverMessages(int index) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "QueueImpl.deliverMessages(" + index + ')'); ReceiveRequest notRec = null; boolean replied; int j = 0; Message msg; QueueMsgReply notMsg; ClientMessages deadMessages = null; if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> requests = " + requests + ')'); long current = System.currentTimeMillis(); cleanWaitingRequest(current); // Cleaning the possible expired messages. deadMessages = cleanPendingMessage(current); // Processing each request as long as there are deliverable messages: while (! messages.isEmpty() && index < requests.size()) { notRec = (ReceiveRequest) requests.get(index); replied = false; notMsg = new QueueMsgReply(notRec); // Checking the deliverable messages: while (j < messages.size()) { msg = (Message) messages.get(j); // If selector matches, sending the message: if (Selector.matches(msg, notRec.getSelector()) && checkDelivery(msg)) { messages.remove(j); msg.deliveryCount++; notMsg.addMessage(msg); if (isLocal(notRec.requester)) { notMsg.setPersistent(false); } nbMsgsDeliverSinceCreation++; // use in sub class see ClusterQueueImpl messageDelivered(msg.getIdentifier()); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Message " + msg.getIdentifier() + " to " + notRec.requester + " as reply to " + notRec.getRequestId()); // Removing the message if request in auto ack mode: if (notRec.getAutoAck()) msg.delete(); // Else, putting the message in the delivered messages table: else { if (notMsg.isPersistent()) { // state change, so save. setSave(); } consumers.put(msg.getIdentifier(), notRec.requester); contexts.put(msg.getIdentifier(), new Integer(notRec.getClientContext())); deliveredMsgs.put(msg.getIdentifier(), msg); } if (notMsg.getSize() == notRec.getMessageCount()) { break; } } else { // If message delivered or selector does not match: going on j++; } } // Next request: if (notMsg.getSize() > 0) { requests.remove(index); Channel.sendTo(notRec.requester, notMsg); } else { index++; } j = 0; } // If needed, sending the dead messages to the DMQ: if (deadMessages != null) sendToDMQ(deadMessages, null); } protected boolean checkDelivery(Message msg) { return true; } /** * call in deliverMessages just after channel.sendTo(msg), * overload this methode to process a specific treatment. */ protected void messageDelivered(String msgId) {} /** * call in deliverMessages just after a remove message (invalid), * overload this methode to process a specific treatment. */ protected void messageRemoved(String msgId) {} /** * Returns <code>true</code> if a given message is considered as * undeliverable, because its delivery count matches the queue's * threshold, if any, or the server's default threshold value (if any). */ protected boolean isUndeliverable(Message message) { if (threshold != null) return message.deliveryCount == threshold.intValue(); else if (DeadMQueueImpl.threshold != null) return message.deliveryCount == DeadMQueueImpl.threshold.intValue(); return false; } /** Deserializes a <code>QueueImpl</code> instance. */ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "QueueImpl.readObject()"); in.defaultReadObject(); receiving = false; messages = new Vector(); deliveredMsgs = new Hashtable(); // Retrieving the persisted messages, if any. Vector persistedMsgs = MessagePersistenceModule.loadAll(getDestinationId()); if (persistedMsgs != null) { Message persistedMsg; AgentId consId; while (! persistedMsgs.isEmpty()) { persistedMsg = (Message) persistedMsgs.remove(0); consId = (AgentId) consumers.get(persistedMsg.getIdentifier()); if (consId == null) { addMessage(persistedMsg); } else if (isLocal(consId)) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getIdentifier()); consumers.remove(persistedMsg.getIdentifier()); contexts.remove(persistedMsg.getIdentifier()); addMessage(persistedMsg); } else { deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg); } } } } public void readBag(ObjectInputStream in) throws IOException, ClassNotFoundException { receiving = in.readBoolean(); messages = (Vector)in.readObject(); deliveredMsgs = (Hashtable)in.readObject(); for (int i = 0; i < messages.size(); i++) { Message message = (Message)messages.elementAt(i); message.save(getDestinationId()); } } public void writeBag(ObjectOutputStream out) throws IOException { out.writeBoolean(receiving); out.writeObject(messages); out.writeObject(deliveredMsgs); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -