📄 queueimpl.java
字号:
ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis()); // Sending the dead messages to the DMQ, if needed: if (deadMessages != null) sendToDMQ(deadMessages, null); Channel.sendTo(from, new Monit_GetNumberRep(not, messages.size())); } /** * Method implementing the reaction to a * <code>Monit_GetPendingRequests</code> notification requesting the * number of pending requests. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetPendingRequests not) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); Channel.sendTo(from, new Monit_GetNumberRep(not, getWaitingRequestCount())); } /** * Method implementing the reaction to a * <code>Monit_GetNbMaxMsg</code> notification requesting the * number max of messages in this queue. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetNbMaxMsg not) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); Channel.sendTo(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg)); } /** * Method implementing the reaction to a <code>ReceiveRequest</code> * instance, requesting a message. * <p> * This method stores the request and launches a delivery sequence. * * @exception AccessException If the sender is not a reader. */ protected void doReact(AgentId from, ReceiveRequest not) throws AccessException { // If client is not a reader, sending an exception. if (! isReader(from)) throw new AccessException("READ right not granted"); String[] toAck = not.getMessageIds(); if (toAck != null) { for (int i = 0; i < toAck.length; i++) { acknowledge(toAck[i]); } } long current = System.currentTimeMillis(); cleanWaitingRequest(current); // Storing the request: not.requester = from; not.setExpiration(current); if (not.isPersistent()) { // state change, so save. setSave(); } requests.add(not); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> requests count = " + requests.size()); // Launching a delivery sequence for this request: int reqIndex = requests.size() - 1; deliverMessages(reqIndex); // If the request has not been answered and if it is an immediate // delivery request, sending a null: if ((requests.size() - 1) == reqIndex && not.getTimeOut() == -1) { requests.remove(reqIndex); QueueMsgReply reply = new QueueMsgReply(not); if (isLocal(from)) { reply.setPersistent(false); } Channel.sendTo(from, reply); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Receive answered by a null."); } } /** * Method implementing the queue reaction to a <code>BrowseRequest</code> * instance, requesting an enumeration of the messages on the queue. * <p> * The method sends a <code>BrowseReply</code> back to the client. Expired * messages are sent to the DMQ. * * @exception AccessException If the requester is not a reader. */ protected void doReact(AgentId from, BrowseRequest not) throws AccessException { // If client is not a reader, sending an exception. if (! isReader(from)) throw new AccessException("READ right not granted"); // Building the reply: BrowseReply rep = new BrowseReply(not); // Cleaning the possible expired messages. ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis()); // Adding the deliverable messages to it: int i = 0; Message message; while (i < messages.size()) { message = (Message) messages.get(i); if (Selector.matches(message, not.getSelector())) { // Matching selector: adding the message: rep.addMessage(message); } i++; } // Sending the dead messages to the DMQ, if needed: if (deadMessages != null) sendToDMQ(deadMessages, null); // Delivering the reply: Channel.sendTo(from, rep); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Request answered."); } /** * Method implementing the reaction to an <code>AcknowledgeRequest</code> * instance, requesting messages to be acknowledged. */ protected void doReact(AgentId from, AcknowledgeRequest not) { for (Enumeration ids = not.getIds(); ids.hasMoreElements();) { String msgId = (String) ids.nextElement(); acknowledge(msgId); } } private void acknowledge(String msgId) { Message msg = (Message) deliveredMsgs.remove(msgId); if ((msg != null) && msg.getPersistent()) { // state change, so save. setSave(); } consumers.remove(msgId); contexts.remove(msgId); if (msg != null) { msg.delete(); if (logger.isLoggable(BasicLevel.DEBUG)) { logger.log(BasicLevel.DEBUG, "Message " + msgId + " acknowledged."); } } else if (logger.isLoggable(BasicLevel.WARN)) { logger.log(BasicLevel.WARN, "Message " + msgId + " not found for acknowledgement."); } } /** * Method implementing the reaction to a <code>DenyRequest</code> * instance, requesting messages to be denied. * <p> * This method denies the messages and launches a delivery sequence. * Messages considered as undeliverable are sent to the DMQ. */ protected void doReact(AgentId from, DenyRequest not) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "QueueImpl.doReact(" + from + ',' + not + ')'); Enumeration ids = not.getIds(); String msgId; Message msg; AgentId consId; int consCtx; ClientMessages deadMessages = null; // If the deny request is empty, the denying is a contextual one: it // requests the denying of all the messages consumed by the denier in // the denying context: if (! ids.hasMoreElements()) { // Browsing the delivered messages: for (Enumeration delIds = deliveredMsgs.keys(); delIds.hasMoreElements();) { msgId = (String) delIds.nextElement(); msg = (Message) deliveredMsgs.get(msgId); consId = (AgentId) consumers.get(msgId); consCtx = ((Integer) contexts.get(msgId)).intValue(); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> deny msg " + msgId + "(consId = " + consId + ')'); // If the current message has been consumed by the denier in the same // context: denying it. if (consId.equals(from) && consCtx == not.getClientContext()) { // state change, so save. setSave(); consumers.remove(msgId); contexts.remove(msgId); deliveredMsgs.remove(msgId); msg.denied = true; // If message considered as undeliverable, adding // it to the vector of dead messages: if (isUndeliverable(msg)) { msg.delete(); msg.undeliverable = true; if (deadMessages == null) deadMessages = new ClientMessages(); deadMessages.addMessage(msg); } else { // Else, putting the message back into the deliverables vector: storeMessage(msg); } if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied."); } } } // For a non empty request, browsing the denied messages: for (ids = not.getIds(); ids.hasMoreElements();) { msgId = (String) ids.nextElement(); msg = (Message) deliveredMsgs.remove(msgId); // Message may have already been denied. For example, a proxy may deny // a message twice, first when detecting a connection failure - and // in that case it sends a contextual denying -, then when receiving // the message from the queue - and in that case it also sends an // individual denying. if (msg == null) { if (logger.isLoggable(BasicLevel.ERROR)) logger.log(BasicLevel.ERROR, " -> already denied message " + msgId); break; } msg.denied = true; if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> deny " + msgId); // state change, so save. setSave(); consumers.remove(msgId); contexts.remove(msgId); // If message considered as undeliverable, adding it // to the vector of dead messages: if (isUndeliverable(msg)) { msg.delete(); msg.undeliverable = true; if (deadMessages == null) deadMessages = new ClientMessages(); deadMessages.addMessage(msg); } // Else, putting the message back into the deliverables vector: else storeMessage(msg); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied."); } // Sending the dead messages to the DMQ, if needed: if (deadMessages != null) sendToDMQ(deadMessages, null); // Lauching a delivery sequence: deliverMessages(0); } protected void doReact(AgentId from, AbortReceiveRequest not) { for (int i = 0; i < requests.size(); i++) { ReceiveRequest request = (ReceiveRequest) requests.get(i); if (request.requester.equals(from) && request.getClientContext() == not.getClientContext() && request.getRequestId() == not.getAbortedRequestId()) { if (not.isPersistent()) { // state change, so save. setSave(); } requests.remove(i); break; } } } private void doReact(AgentId from, DestinationAdminRequestNot not) { org.objectweb.joram.shared.admin.AdminRequest adminRequest = not.getRequest(); if (adminRequest instanceof GetQueueMessageIds) { doReact((GetQueueMessageIds)adminRequest, not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId()); } else if (adminRequest instanceof GetQueueMessage) { doReact((GetQueueMessage)adminRequest, not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId()); } else if (adminRequest instanceof DeleteQueueMessage) { doReact((DeleteQueueMessage)adminRequest, not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId()); } else if (adminRequest instanceof ClearQueue) { doReact((ClearQueue)adminRequest, not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId()); } } private void doReact(GetQueueMessageIds request, AgentId replyTo, String requestMsgId, String replyMsgId) { String[] res = new String[messages.size()]; for (int i = 0; i < messages.size(); i++) { Message msg = (Message)messages.elementAt(i); res[i] = msg.getIdentifier(); } GetQueueMessageIdsRep reply = new GetQueueMessageIdsRep(res); replyToTopic(reply, replyTo, requestMsgId, replyMsgId); } private void doReact(GetQueueMessage request, AgentId replyTo, String requestMsgId, String replyMsgId) { Message msg = null; for (int i = 0; i < messages.size(); i++) { msg = (Message)messages.elementAt(i); if (msg.getIdentifier().equals(request.getMessageId())) { break; } } if (msg != null) { replyToTopic( new GetQueueMessageRep(msg), replyTo, requestMsgId, replyMsgId); } else { replyToTopic( new org.objectweb.joram.shared.admin.AdminReply( false, "Message not found: " + msg.getIdentifier()), replyTo, requestMsgId, replyMsgId); } } private void doReact(DeleteQueueMessage request, AgentId replyTo, String requestMsgId, String replyMsgId) { for (int i = 0; i < messages.size(); i++) { Message msg = (Message)messages.elementAt(i); if (msg.getIdentifier().equals(request.getMessageId())) { messages.removeElementAt(i); ClientMessages deadMessages = new ClientMessages(); deadMessages.addMessage(msg); sendToDMQ(deadMessages, null); break; } } replyToTopic( new org.objectweb.joram.shared.admin.AdminReply( true, null), replyTo, requestMsgId, replyMsgId); } private void doReact(ClearQueue request, AgentId replyTo, String requestMsgId, String replyMsgId) { if (messages.size() > 0) { ClientMessages deadMessages = new ClientMessages(); for (int i = 0; i < messages.size(); i++) { Message msg = (Message)messages.elementAt(i); deadMessages.addMessage(msg); } sendToDMQ(deadMessages, null); messages.clear(); } replyToTopic( new org.objectweb.joram.shared.admin.AdminReply( true, null), replyTo, requestMsgId, replyMsgId); } private void replyToTopic( org.objectweb.joram.shared.admin.AdminReply reply, AgentId replyTo, String requestMsgId, String replyMsgId) { Message message = Message.create(); message.setCorrelationId(requestMsgId); message.setTimestamp(System.currentTimeMillis()); message.setDestination(replyTo.toString(), Topic.TOPIC_TYPE); message.setIdentifier(replyMsgId); try { message.setObject(reply); Vector messages = new Vector(); messages.add(message); ClientMessages clientMessages = new ClientMessages(-1, -1, messages); Channel.sendTo(replyTo, clientMessages); } catch (Exception exc) { if (logger.isLoggable(BasicLevel.ERROR)) logger.log(BasicLevel.ERROR, "", exc); throw new Error(exc.getMessage()); } } /** * The <code>DestinationImpl</code> class calls this method for passing * notifications which have been partly processed, so that they are * specifically processed by the <code>QueueImpl</code> class. */ protected void specialProcess(Notification not) { if (not instanceof SetRightRequest) doProcess((SetRightRequest) not); else if (not instanceof ClientMessages) doProcess((ClientMessages) not); else if (not instanceof UnknownAgent) doProcess((UnknownAgent) not); else if (not instanceof DeleteNot) doProcess((DeleteNot) not);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -