📄 messagingservice.java
字号:
/**
* Access the command filter this service needs to perform its
* tasks. This filter will be installed within the local command
* processing engine.
* @param direction One of the two constants
* <code>Filter.INCOMING</code> and <code>Filter.OUTGOING</code>,
* distinguishing between the two filter chains managed by the
* command processor.
* @return A <code>Filter</code> object, used by this service to
* intercept and process kernel-level commands.
*/
public Filter getCommandFilter(boolean direction){
if (direction == Filter.OUTGOING){
return encOutFilter;
} else {
return encInFilter;
}
}
/**
* Access the command sink this service uses to handle its own
* vertical commands.
*/
public Sink getCommandSink(boolean side) {
if(side == Sink.COMMAND_SOURCE) {
return senderSink;
}
else {
return receiverSink;
}
}
/**
* Access the names of the vertical commands this service wants to
* handle as their final destination. This set must not overlap
* with the owned commands set of any previously installed
* service, or an exception will be raised and service
* activation will fail.
*
* @see jade.core.Service#getCommandSink()
*/
public String[] getOwnedCommands() {
return OWNED_COMMANDS;
}
void notifyLocalMTPs() {
Iterator it = routes.getLocalMTPs();
while (it.hasNext()) {
RoutingTable.MTPInfo info = (RoutingTable.MTPInfo) it.next();
MTPDescriptor mtp = info.getDescriptor();
ContainerID cid = myContainer.getID();
try {
MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
try {
mainSlice.newMTP(mtp, cid);
}
catch(IMTPException imtpe) {
mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
mainSlice.newMTP(mtp, cid);
}
}
catch (Exception e) {
myLogger.log(Logger.WARNING, "Error notifying local MTP "+mtp.getName()+" to Main Container.", e);
}
}
}
/**
* Inner class CommandSourceSink
* This inner class handles the messaging commands on the command
* issuer side, turning them into horizontal commands and
* forwarding them to remote slices when necessary.
*/
private class CommandSourceSink implements Sink {
public void consume(VerticalCommand cmd) {
try {
String name = cmd.getName();
if(name.equals(MessagingSlice.SEND_MESSAGE)) {
handleSendMessage(cmd);
}
else if(name.equals(MessagingSlice.NOTIFY_FAILURE)) {
handleNotifyFailure(cmd);
}
else if(name.equals(MessagingSlice.INSTALL_MTP)) {
Object result = handleInstallMTP(cmd);
cmd.setReturnValue(result);
}
else if(name.equals(MessagingSlice.UNINSTALL_MTP)) {
handleUninstallMTP(cmd);
}
else if(name.equals(MessagingSlice.NEW_MTP)) {
handleNewMTP(cmd);
}
else if(name.equals(MessagingSlice.DEAD_MTP)) {
handleDeadMTP(cmd);
}
else if(name.equals(MessagingSlice.SET_PLATFORM_ADDRESSES)) {
handleSetPlatformAddresses(cmd);
}
}
catch(IMTPException imtpe) {
imtpe.printStackTrace();
}
catch(NotFoundException nfe) {
nfe.printStackTrace();
}
catch(ServiceException se) {
se.printStackTrace();
}
catch(MTPException mtpe) {
mtpe.printStackTrace();
}
catch(Throwable t) {
cmd.setReturnValue(t);
}
}
// Vertical command handler methods
private void handleSendMessage(VerticalCommand cmd) {
Object[] params = cmd.getParams();
AID sender = (AID)params[0];
GenericMessage msg = (GenericMessage)params[1];
AID dest = (AID)params[2];
// Since message delivery is asynchronous we use the GenericMessage
// as a temporary holder for the sender principal and credentials
msg.setSenderPrincipal(cmd.getPrincipal());
msg.setSenderCredentials(cmd.getCredentials());
checkTracing(msg);
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, "MessagingService source sink handling message "+MessageManager.stringify(msg)+" for receiver "+dest.getName()+". TraceID = "+msg.getTraceID());
}
myMessageManager.deliver(msg, dest, MessagingService.this);
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID()+" - Message enqueued to MessageManager.");
}
}
private void handleNotifyFailure(VerticalCommand cmd) {
Object[] params = cmd.getParams();
GenericMessage msg = (GenericMessage)params[0];
AID receiver = (AID)params[1];
InternalError ie = (InternalError)params[2];
// If (the sender is not the AMS)
// The acl message contained inside the GenericMessage should never
// be null (it is used to generate the failure message)
ACLMessage aclmsg = msg.getACLMessage();
if((aclmsg.getSender()==null) || (aclmsg.getSender().equals(myContainer.getAMS()))) // sanity check to avoid infinite loops
return;
// Send back a failure message
final ACLMessage failure = aclmsg.createReply();
failure.setPerformative(ACLMessage.FAILURE);
final AID theAMS = myContainer.getAMS();
failure.setSender(theAMS);
failure.setLanguage(FIPANames.ContentLanguage.FIPA_SL);
// FIXME: the content is not completely correct, but that should
// also avoid creating wrong content
String content = "( (action " + msg.getSender().toString();
content = content + " (ACLMessage) ) (MTS-error "+receiver+" "+ie.getMessage() + ") )";
failure.setContent(content);
try {
GenericCommand command = new GenericCommand(MessagingSlice.SEND_MESSAGE, MessagingSlice.NAME, null);
command.addParam(theAMS);
GenericMessage gm = new GenericMessage(failure);
gm.setAMSFailure(true);
command.addParam(gm);
command.addParam((AID)(failure.getAllReceiver().next()));
// FIXME: We should set the AMS principal and credentials
submit(command);
}
catch(ServiceException se) {
// It should never happen
se.printStackTrace();
}
}
private MTPDescriptor handleInstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, NotFoundException, MTPException {
Object[] params = cmd.getParams();
String address = (String)params[0];
ContainerID cid = (ContainerID)params[1];
String className = (String)params[2];
MessagingSlice targetSlice = (MessagingSlice)getSlice(cid.getName());
try {
return targetSlice.installMTP(address, className);
}
catch(IMTPException imtpe) {
targetSlice = (MessagingSlice)getFreshSlice(cid.getName());
return targetSlice.installMTP(address, className);
}
}
private void handleUninstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, NotFoundException, MTPException {
Object[] params = cmd.getParams();
String address = (String)params[0];
ContainerID cid = (ContainerID)params[1];
MessagingSlice targetSlice = (MessagingSlice)getSlice(cid.getName());
try {
targetSlice.uninstallMTP(address);
}
catch(IMTPException imtpe) {
targetSlice = (MessagingSlice)getFreshSlice(cid.getName());
targetSlice.uninstallMTP(address);
}
}
private void handleNewMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
Object[] params = cmd.getParams();
MTPDescriptor mtp = (MTPDescriptor)params[0];
ContainerID cid = (ContainerID)params[1];
MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
try {
mainSlice.newMTP(mtp, cid);
}
catch(IMTPException imtpe) {
mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
mainSlice.newMTP(mtp, cid);
}
}
private void handleDeadMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
Object[] params = cmd.getParams();
MTPDescriptor mtp = (MTPDescriptor)params[0];
ContainerID cid = (ContainerID)params[1];
MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
try {
mainSlice.deadMTP(mtp, cid);
}
catch(IMTPException imtpe) {
mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
mainSlice.deadMTP(mtp, cid);
}
}
private void handleSetPlatformAddresses(VerticalCommand cmd) {
Object[] params = cmd.getParams();
AID id = (AID)params[0];
id.clearAllAddresses();
addPlatformAddresses(id);
}
} // END of inner class CommandSourceSink
/**
* Inner class CommandTargetSink
*/
private class CommandTargetSink implements Sink {
public void consume(VerticalCommand cmd) {
try {
String name = cmd.getName();
if(name.equals(MessagingSlice.SEND_MESSAGE)) {
handleSendMessage(cmd);
}
else if(name.equals(MessagingSlice.INSTALL_MTP)) {
Object result = handleInstallMTP(cmd);
cmd.setReturnValue(result);
}
else if(name.equals(MessagingSlice.UNINSTALL_MTP)) {
handleUninstallMTP(cmd);
}
else if(name.equals(MessagingSlice.NEW_MTP)) {
handleNewMTP(cmd);
}
else if(name.equals(MessagingSlice.DEAD_MTP)) {
handleDeadMTP(cmd);
}
else if(name.equals(MessagingSlice.SET_PLATFORM_ADDRESSES)) {
handleSetPlatformAddresses(cmd);
}
else if(name.equals(Service.NEW_SLICE)) {
handleNewSlice(cmd);
}
}
catch(IMTPException imtpe) {
cmd.setReturnValue(imtpe);
}
catch(NotFoundException nfe) {
cmd.setReturnValue(nfe);
}
catch(ServiceException se) {
cmd.setReturnValue(se);
}
catch(MTPException mtpe) {
cmd.setReturnValue(mtpe);
}
}
private void handleSendMessage(VerticalCommand cmd) throws NotFoundException {
Object[] params = cmd.getParams();
AID senderID = (AID)params[0];
GenericMessage msg = (GenericMessage)params[1];
AID receiverID = (AID)params[2];
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID()+" - MessagingService target sink posting message to receiver "+receiverID.getLocalName());
}
postMessage(msg.getACLMessage(), receiverID);
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID()+" - Message posted");
}
}
private MTPDescriptor handleInstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, MTPException {
Object[] params = cmd.getParams();
String address = (String)params[0];
String className = (String)params[1];
return installMTP(address, className);
}
private void handleUninstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, NotFoundException, MTPException {
Object[] params = cmd.getParams();
String address = (String)params[0];
uninstallMTP(address);
}
private void handleNewMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
Object[] params = cmd.getParams();
MTPDescriptor mtp = (MTPDescriptor)params[0];
ContainerID cid = (ContainerID)params[1];
newMTP(mtp, cid);
}
private void handleDeadMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
Object[] params = cmd.getParams();
MTPDescriptor mtp = (MTPDescriptor)params[0];
ContainerID cid = (ContainerID)params[1];
deadMTP(mtp, cid);
}
private void handleSetPlatformAddresses(VerticalCommand cmd) {
}
private void handleNewSlice(VerticalCommand cmd) {
MainContainer impl = myContainer.getMain();
if(impl != null) {
Object[] params = cmd.getParams();
String newSliceName = (String) params[0];
try {
// Be sure to get the new (fresh) slice --> Bypass the service cache
MessagingSlice newSlice = (MessagingSlice) getFreshSlice(newSliceName);
// Send all possible routes to the new slice
ContainerID[] cids = impl.containerIDs();
for(int i = 0; i < cids.length; i++) {
ContainerID cid = cids[i];
try {
List mtps = impl.containerMTPs(cid);
Iterator it = mtps.iterator();
while(it.hasNext()) {
MTPDescriptor mtp = (MTPDescriptor)it.next();
newSlice.addRoute(mtp, cid.getName());
}
}
catch(NotFoundException nfe) {
// Should never happen
nfe.printStackTrace();
}
}
}
catch (ServiceException se) {
// Should never happen since getSlice() should always work on the Main container
se.printStackTrace();
}
catch (IMTPException imtpe) {
// Should never happen since the new slice should be always valid at this time
imtpe.printStackTrace();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -