📄 destinationimpl.java
字号:
* Method implementing the reaction to a <code>Monit_GetDMQSettings</code> * notification requesting the destination's DMQ settings. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetDMQSettings not) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); String id = null; if (dmqId != null) id = dmqId.toString(); Channel.sendTo(from, new Monit_GetDMQSettingsRep(not, id, null)); } /** * Method implementing the reaction to a <code>Monit_GetStat</code> * notification requesting to get statistic of this destination. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetStat not) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); Hashtable stats = new Hashtable(); stats.put("nbMsgsReceiveSinceCreation", new Long(nbMsgsReceiveSinceCreation)); stats.put("nbMsgsDeliverSinceCreation", new Long(nbMsgsDeliverSinceCreation)); stats.put("nbMsgsSendToDMQSinceCreation", new Long(nbMsgsSendToDMQSinceCreation)); stats.put("creationDate", new Long(creationDate)); Channel.sendTo(from, new Monit_GetStatRep(not, stats)); } /** * Method implementing the reaction to a <code>ClientMessages</code> * notification holding messages sent by a client. * <p> * If the sender is not a writer on the destination the messages are * sent to the DMQ and an exception is thrown. Otherwise, the processing of * the received messages is performed in subclasses. * * @exception AccessException If the sender is not a WRITER on the * destination. */ protected void doReact(AgentId from, ClientMessages not) throws AccessException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "DestinationImpl.doReact(" + from + ',' + not + ')'); // If sender is not a writer, sending the messages to the DMQ, and // throwing an exception: if (! isWriter(from)) { ClientMessages deadM; deadM = new ClientMessages(not.getClientContext(), not.getRequestId()); Message msg; for (Enumeration msgs = not.getMessages().elements(); msgs.hasMoreElements();) { msg = (Message) msgs.nextElement(); msg.notWriteable = true; deadM.addMessage(msg); } sendToDMQ(deadM, not.getDMQId()); throw new AccessException("WRITE right not granted"); } specialProcess(not); // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // for topic performance : must send reply after process ClientMessage // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! if (! not.getPersistent() && !not.getAsyncSend()) { Channel.sendTo(from, new SendReplyNot(not.getClientContext(), not.getRequestId())); } } /** * Method implementing the reaction to an <code>UnknownAgent</code> * notification. * <p> * If the unknown agent is the DMQ, its identifier is set to null. If it * is a client of the destination, it is removed. Specific processing is * also done in subclasses. */ protected void doReact(AgentId from, UnknownAgent not) { if (isAdministrator(not.agent)) { if (logger.isLoggable(BasicLevel.ERROR)) logger.log(BasicLevel.ERROR, "Admin of dest " + destId + " does not exist anymore."); } else if (not.agent.equals(dmqId)) { // state change, so save. setSave(); dmqId = null; } else { // state change, so save. setSave(); clients.remove(from); specialProcess(not); } } /** * Method implementing the reaction to a <code>DeleteNot</code> * notification requesting the deletion of the destination. * <p> * The processing is done in subclasses if the sender is an administrator. */ protected void doReact(AgentId from, DeleteNot not) { if (! isAdministrator(from)) { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, "Unauthorized eletion request from " + from); } else { specialProcess(not); // state change, so save. setSave(); deletable = true; } } /** * Method implementing the reaction to a <code>SpecialAdminRequest</code> * notification requesting the special administration of the destination. * <p> */ protected void doReact(AgentId from, SpecialAdminRequest not) { String info; Object obj = null; // state change, so save. setSave(); try { if (!isAdministrator(from)) { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, "Unauthorized SpecialAdminRequest request from " + from); throw new RequestException("ADMIN right not granted"); } obj = specialAdminProcess(not); info = strbuf.append("Request [") .append(not.getClass().getName()) .append("], sent to Destination [") .append(destId) .append("], successful [true] ").toString(); strbuf.setLength(0); Channel.sendTo(from, new AdminReply(not, true, info, obj)); } catch (RequestException exc) { info = strbuf.append("Request [") .append(not.getClass().getName()) .append("], sent to Destination [") .append(destId) .append("], successful [false]: ") .append(exc.getMessage()).toString(); strbuf.setLength(0); Channel.sendTo(from, new AdminReply(not, false, info, obj)); } if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, info); } protected void doReact(AgentId from, RequestGroupNot not) { Enumeration en = not.getClientMessages(); ClientMessages theCM = (ClientMessages) en.nextElement(); Vector replies = new Vector(); replies.addElement(new SendReplyNot( theCM.getClientContext(), theCM.getRequestId())); while (en.hasMoreElements()) { ClientMessages cm = (ClientMessages) en.nextElement(); Vector msgs = cm.getMessages(); for (int i = 0; i < msgs.size(); i++) { theCM.addMessage((Message) msgs.elementAt(i)); } if (! cm.getAsyncSend()) { replies.addElement(new SendReplyNot( cm.getClientContext(), cm.getRequestId())); } } specialProcess(theCM); // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // for topic performance : must send reply after process ClientMessage // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! if (! not.getPersistent() && replies.size() > 0) { Channel.sendTo( from, new SendRepliesNot(replies)); } } protected Object specialAdminProcess(SpecialAdminRequest not) throws RequestException { return null; } /** * Checks the reading permission of a given client agent. * * @param client AgentId of the client requesting a reading permission. */ protected boolean isReader(AgentId client) { if (isAdministrator(client) || freeReading) return true; Integer clientRight = (Integer) clients.get(client); if (clientRight == null) return false; else return ((clientRight.intValue() == READ) || (clientRight.intValue() == READWRITE)); } /** * Checks the writing permission of a given client agent. * * @param client AgentId of the client requesting a writing permission. */ protected boolean isWriter(AgentId client) { if (isAdministrator(client) || freeWriting) return true; Integer clientRight = (Integer) clients.get(client); if (clientRight == null) return false; else return ((clientRight.intValue() == WRITE) || (clientRight.intValue() == READWRITE)); } /** * Checks the administering permission of a given client agent. * * @param client AgentId of the client requesting an admin permission. */ protected boolean isAdministrator(AgentId client) { return client.equals(adminId) || client.equals(AdminTopic.getDefault()); } /** * Sends dead messages to the appropriate dead message queue. * * @param deadMessages The dead messages. * @param dmqId Identifier of the dead message queue to use, * <code>null</code> if not provided. */ protected void sendToDMQ(ClientMessages deadMessages, AgentId dmqId) { Vector messages = deadMessages.getMessages(); nbMsgsSendToDMQSinceCreation = nbMsgsSendToDMQSinceCreation + messages.size(); AgentId destDmqId = null; if (dmqId != null) { // Sending the dead messages to the provided DMQ destDmqId = dmqId; } else if (this.dmqId != null) { // Sending the dead messages to the destination's DMQ destDmqId = this.dmqId; } else if (DeadMQueueImpl.id != null) { // Sending the dead messages to the server's default DMQ destDmqId = DeadMQueueImpl.id; } if (destDmqId != null && ! destDmqId.equals(destId)) { Channel.sendTo(destDmqId, deadMessages); } // Else it means that the dead message queue is // the queue itself: drop the messages. } /** * Abstract method to be implemented by subclasses for specifically * processing notifications. */ protected abstract void specialProcess(Notification not); private void writeObject(java.io.ObjectOutputStream out) throws IOException { out.writeBoolean(deletable); out.writeObject(adminId); out.writeObject(destId); out.writeBoolean(freeReading); out.writeBoolean(freeWriting); out.writeObject(clients); out.writeObject(dmqId); out.writeLong(creationDate); out.writeLong(nbMsgsReceiveSinceCreation); out.writeLong(nbMsgsDeliverSinceCreation); out.writeLong(nbMsgsSendToDMQSinceCreation); out.writeObject(agent); } private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { deletable = in.readBoolean(); adminId = (AgentId)in.readObject(); destId = (AgentId)in.readObject(); freeReading = in.readBoolean(); freeWriting = in.readBoolean(); clients = (Hashtable)in.readObject(); dmqId = (AgentId)in.readObject(); strbuf = new StringBuffer(); creationDate = in.readLong(); nbMsgsReceiveSinceCreation = in.readLong(); nbMsgsDeliverSinceCreation = in.readLong(); nbMsgsSendToDMQSinceCreation = in.readLong(); agent = (Destination)in.readObject(); } // DestinationMBean interface /** * Returns the unique identifier of the destination. * * @return the unique identifier of the destination. */ public String getDestinationId() { return destId.toString(); } /** * Tests if this destination is free for reading. * * @return true if anyone can receive messages from this destination; * false otherwise. */ public boolean isFreeReading() { return freeReading; } /** * Sets the <code>FreeReading</code> attribute for this destination. * * @param on if true anyone can receive message from this destination. */ public void setFreeReading(boolean on) { // state change, so save. setSave(); freeReading = on; } /** * Tests if this destination is free for writing. * * @return true if anyone can send messages to this destination; * false otherwise. */ public boolean isFreeWriting() { return freeWriting; } /** * Sets the <code>FreeWriting</code> attribute for this destination. * * @param on if true anyone can send message to this destination. */ public void setFreeWriting(boolean on) { // state change, so save. setSave(); freeWriting = on; } /** * Return the unique identifier of DMQ set for this destnation if any. * * @return the unique identifier of DMQ set for this destnation if any; * null otherwise. */ public String getDMQId() { if (dmqId != null) return dmqId.toString(); return null; } /** * Returns this destination creation time as a long. * * @return the destination creation time as UTC milliseconds from the epoch. */ public long getCreationTimeInMillis() { return creationDate; } /** * Returns this destination creation time through a <code>String</code> of * the form: <code>dow mon dd hh:mm:ss zzz yyyy</code>. * * @return the destination creation time. */ public String getCreationDate() { Calendar cal = Calendar.getInstance(); cal.setTimeInMillis(creationDate); return cal.getTime().toString(); } /** * Returns the number of messages received since creation time of this * destination. * * @return the number of messages received since creation time. */ public long getNbMsgsReceiveSinceCreation() { return nbMsgsReceiveSinceCreation; } /** * Returns the number of messages delivered since creation time of this * destination. It includes messages all delivered messages to a consumer, * already acknowledged or not. * * @return the number of messages delivered since creation time. */ public long getNbMsgsDeliverSinceCreation() { return nbMsgsDeliverSinceCreation; } /** * Returns the number of erroneous messages forwarded to the DMQ since * creation time of this destination.. * * @return the number of erroneous messages forwarded to the DMQ. */ public long getNbMsgsSendToDMQSinceCreation() { return nbMsgsSendToDMQSinceCreation; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -