📄 admintopicimpl.java
字号:
request = (AdminRequest) msg.getObject(); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + ": got " + msg.getObject()); } catch (ClassCastException exc) { MomTracing.dbgDestination.log(BasicLevel.ERROR, "--- " + this + ": got bad object"); if (request == null) { info = strbuf.append("Unexpected request to AdminTopic on server [") .append(serverId).append("]: ").append(exc.getMessage()).toString(); strbuf.setLength(0); } else { info = strbuf.append("Request [").append(request.getClass().getName()) .append("], sent to AdminTopic on server [").append(serverId) .append("], successful [false]: ") .append(exc.getMessage()).toString(); strbuf.setLength(0); } distributeReply(replyTo, msgId, new AdminReply(false, info)); } catch (Exception exc) {} processAdminRequests(replyTo, msgId, request, null); } } /** * Method getting the administration requests from messages, and * distributing them to the appropriate reactions. */ private void processAdminRequests(AgentId replyTo, String msgId, AdminRequest request, AgentId from) { String info = null; // state change, so save. setSave(); try { if (request instanceof StopServerRequest) doProcess((StopServerRequest) request, replyTo, msgId); else if (request instanceof CreateDestinationRequest) doProcess((CreateDestinationRequest) request, replyTo, msgId); else if (request instanceof DeleteDestination) doProcess((DeleteDestination) request, replyTo, msgId); else if (request instanceof SetCluster) doProcess((SetCluster) request, replyTo, msgId); else if (request instanceof UnsetCluster) doProcess((UnsetCluster) request, replyTo, msgId); else if (request instanceof SetFather) doProcess((SetFather) request, replyTo, msgId); else if (request instanceof UnsetFather) doProcess((UnsetFather) request, replyTo, msgId); else if (request instanceof CreateUserRequest) doProcess((CreateUserRequest) request, replyTo, msgId); else if (request instanceof UpdateUser) doProcess((UpdateUser) request, replyTo, msgId); else if (request instanceof DeleteUser) doProcess((DeleteUser) request, replyTo, msgId); else if (request instanceof SetRight) doProcess((SetRight) request, replyTo, msgId); else if (request instanceof SetDefaultDMQ) doProcess((SetDefaultDMQ) request, replyTo, msgId); else if (request instanceof SetDestinationDMQ) doProcess((SetDestinationDMQ) request, replyTo, msgId); else if (request instanceof SetUserDMQ) doProcess((SetUserDMQ) request, replyTo, msgId); else if (request instanceof SetNbMaxMsg) doProcess((SetNbMaxMsg) request, replyTo, msgId); else if (request instanceof SetDefaultThreshold) doProcess((SetDefaultThreshold) request, replyTo, msgId); else if (request instanceof SetQueueThreshold) doProcess((SetQueueThreshold) request, replyTo, msgId); else if (request instanceof SetUserThreshold) doProcess((SetUserThreshold) request, replyTo, msgId); else if (request instanceof UnsetDefaultDMQ) doProcess((UnsetDefaultDMQ) request, replyTo, msgId); else if (request instanceof UnsetDestinationDMQ) doProcess((UnsetDestinationDMQ) request, replyTo, msgId); else if (request instanceof UnsetUserDMQ) doProcess((UnsetUserDMQ) request, replyTo, msgId); else if (request instanceof UnsetDefaultThreshold) doProcess((UnsetDefaultThreshold) request, replyTo, msgId); else if (request instanceof UnsetQueueThreshold) doProcess((UnsetQueueThreshold) request, replyTo, msgId); else if (request instanceof UnsetUserThreshold) doProcess((UnsetUserThreshold) request, replyTo, msgId); else if (request instanceof Monitor_GetServersIds) doProcess((Monitor_GetServersIds) request, replyTo, msgId); else if (request instanceof GetDomainNames) doProcess((GetDomainNames) request, replyTo, msgId); else if (request instanceof GetLocalServer) doProcess((GetLocalServer) request, replyTo, msgId); else if (request instanceof Monitor_GetDestinations) doProcess((Monitor_GetDestinations) request, replyTo, msgId); else if (request instanceof Monitor_GetUsers) doProcess((Monitor_GetUsers) request, replyTo, msgId); else if (request instanceof Monitor_GetReaders) doProcess((Monitor_GetReaders) request, replyTo, msgId); else if (request instanceof Monitor_GetWriters) doProcess((Monitor_GetWriters) request, replyTo, msgId); else if (request instanceof Monitor_GetFreeAccess) doProcess((Monitor_GetFreeAccess) request, replyTo, msgId); else if (request instanceof Monitor_GetDMQSettings) doProcess((Monitor_GetDMQSettings) request, replyTo, msgId); else if (request instanceof Monitor_GetFather) doProcess((Monitor_GetFather) request, replyTo, msgId); else if (request instanceof Monitor_GetCluster) doProcess((Monitor_GetCluster) request, replyTo, msgId); else if (request instanceof Monitor_GetPendingMessages) doProcess((Monitor_GetPendingMessages) request, replyTo, msgId); else if (request instanceof Monitor_GetPendingRequests) doProcess((Monitor_GetPendingRequests) request, replyTo, msgId); else if (request instanceof Monitor_GetStat) doProcess((Monitor_GetStat) request, replyTo, msgId); else if (request instanceof Monitor_GetNbMaxMsg) doProcess((Monitor_GetNbMaxMsg) request, replyTo, msgId); else if (request instanceof Monitor_GetSubscriptions) doProcess((Monitor_GetSubscriptions) request, replyTo, msgId); else if (request instanceof SpecialAdmin) doProcess((SpecialAdmin) request, replyTo, msgId); else if (request instanceof AddServerRequest) doProcess((AddServerRequest) request, replyTo, msgId, from); else if (request instanceof AddDomainRequest) doProcess((AddDomainRequest) request, replyTo, msgId, from); else if (request instanceof RemoveServerRequest) doProcess((RemoveServerRequest) request, replyTo, msgId, from); else if (request instanceof RemoveDomainRequest) doProcess((RemoveDomainRequest) request, replyTo, msgId, from); else if (request instanceof GetConfigRequest) doProcess((GetConfigRequest) request, replyTo, msgId); else if (request instanceof UserAdminRequest) doProcess((UserAdminRequest) request, replyTo, msgId); else if (request instanceof GetSubscriberIds) doProcess((GetSubscriberIds) request, replyTo, msgId); else if (request instanceof QueueAdminRequest) doProcess((QueueAdminRequest) request, replyTo, msgId); } catch (UnknownServerException exc) { // Caught when a target server is invalid. info = strbuf.append("Request [").append(request.getClass().getName()) .append("], successful [false]: ") .append(exc.getMessage()).toString(); strbuf.setLength(0); distributeReply(replyTo, msgId, new AdminReply(false, info)); } catch (MomException exc) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.WARN)) MomTracing.dbgDestination.log(BasicLevel.WARN, exc); if (request == null) { info = strbuf.append("Unexpected request to AdminTopic on server [") .append(serverId).append("]: ").append(exc.getMessage()).toString(); strbuf.setLength(0); } else { info = strbuf.append("Request [").append(request.getClass().getName()) .append("], sent to AdminTopic on server [").append(serverId) .append("], successful [false]: ") .append(exc.getMessage()).toString(); strbuf.setLength(0); } distributeReply(replyTo, msgId, new AdminReply(false, info)); } } /** * Processes a <code>StopServerRequest</code> instance requesting to stop * a given server. */ private void doProcess(StopServerRequest request, AgentId replyTo, String msgId) throws UnknownServerException { if (checkServerId(request.getServerId())) { // It's the local server, process the request. distributeReply(replyTo, msgId, new AdminReply(true, "Server stopped")); AgentServer.stop(false, 500L, true); } else { // Forward the request to the right AdminTopic agent. Channel.sendTo(AdminTopic.getDefault((short) request.getServerId()), new AdminRequestNot(replyTo, msgId, request)); } } /** * Processes a <code>CreateDestinationRequest</code> instance * requesting the creation of a destination. * * @exception UnknownServerException If the target server does not exist. * @exception RequestException If the destination deployement fails. */ private void doProcess(CreateDestinationRequest request, AgentId replyTo, String msgId) throws UnknownServerException, RequestException { if (checkServerId(request.getServerId())) { // The destination is local, process the request. String destName = request.getDestinationName(); boolean destNameActivated = (destName != null && ! destName.equals("")); DestinationDesc destDesc; Agent dest = null; String info; Properties properties = request.getProperties(); // Retrieving an existing destination: if (destNameActivated && destinationsTable.containsKey(destName)) { destDesc = (DestinationDesc) destinationsTable.get(destName); if (! destDesc.isAssignableTo(request.getExpectedType())) { throw new RequestException("Destination type not compliant"); } info = strbuf.append("Request [").append(request.getClass().getName()) .append("], processed by AdminTopic on server [").append(serverId) .append("], successful [true]: destination [") .append(destName).append("] has been retrieved").toString(); strbuf.setLength(0); } else { // Instanciating the destination class. String className = request.getClassName(); Class clazz; String destType; try { clazz = Class.forName(className); dest = (Agent) clazz.newInstance(); if (destName != null) { dest.name = destName; } ((AdminDestinationItf) dest).init(this.destId, properties); Method getTypeM = clazz.getMethod("getDestinationType", new Class[0]); destType = (String)getTypeM.invoke(null, new Object[0]); } catch (Exception exc) { if (exc instanceof ClassCastException) { throw new RequestException( "Class [" + className + "] is not a Destination class."); } else { throw new RequestException( "Could not instanciate Destination class [" + className + "]: " + exc); } } AgentId createdDestId = dest.getId(); if (! destNameActivated) destName = createdDestId.toString(); destDesc = new DestinationDesc(createdDestId, destName, className, destType); if (! destDesc.isAssignableTo(request.getExpectedType())) { throw new RequestException("Destination type not compliant"); } try { dest.deploy(); destinationsTable.put(destName, destDesc); info = strbuf.append("Request [").append(request.getClass().getName()) .append("], processed by AdminTopic on server [").append(serverId) .append("], successful [true]: ").append(className).append(" [") .append(createdDestId.toString()).append("] has been created and deployed") .toString(); strbuf.setLength(0); } catch (Exception exc) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) MomTracing.dbgDestination.log(BasicLevel.ERROR, "xxx", exc); throw new RequestException("Error while deploying Destination [" + clazz + "]: " + exc); } } distributeReply( replyTo, msgId, new CreateDestinationReply( destDesc.getId().toString(), destDesc.getName(), destDesc.getType(), info)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, info); } else { // Forward the request to the right AdminTopic agent. Channel.sendTo(AdminTopic.getDefault((short) request.getServerId()), new AdminRequestNot(replyTo, msgId, request)); } } /** * Processes a <code>DeleteDestination</code> instance requesting the * deletion of a destination. */ private void doProcess(DeleteDestination request, AgentId replyTo, String msgId) throws UnknownServerException { AgentId destId = AgentId.fromString(request.getId()); // If the destination is not local, doing nothing: if (checkServerId(destId.getTo())) { // The destination is local, process the request. String info; Enumeration destinations = destinationsTable.elements(); while (destinations.hasMoreElements()) { DestinationDesc destDesc = (DestinationDesc)destinations.nextElement(); if (destDesc.getId().equals(destId)) { destinationsTable.remove(destDesc.getName()); break; } } Channel.sendTo(destId, new DeleteNot()); info = strbuf.append("Request [").append(request.getClass().getName()) .append("], sent to AdminTopic on server [").append(serverId) .append("], successful [true]: destination [").append(destId) .append("], successfuly notified for deletion").toString(); strbuf.setLength(0); distributeReply(replyTo, msgId, new AdminReply(true, info)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, info); } else { // Forward the request to the right AdminTopic agent. Channel.sendTo(AdminTopic.getDefault(destId.getTo()), new AdminRequestNot(replyTo, msgId, request)); } } /** * Processes a <code>SetCluster<code> instance requesting to link two topics * in a cluster relationship. */ private void doProcess(SetCluster request, AgentId replyTo, String msgId) throws UnknownServerException
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -