📄 messagingservice.java
字号:
if (!msg.hasForeignReceiver()) {
deliverInLocalPlatfrom(msg, receiverID);
}
else {
// Dispatch it through the ACC
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Activating ACC delivery");
}
Iterator addresses = receiverID.getAllAddresses();
while (addresses.hasNext()) {
String address = (String) addresses.next();
try {
forwardMessage(msg, receiverID, address);
return;
}
catch (MTPException mtpe) {
if (myLogger.isLoggable(Logger.WARNING) && !isPersistentDeliveryRetry(msg))
myLogger.log(Logger.WARNING, "Cannot deliver message to address: " + address + " [" + mtpe.toString() + "]. Trying the next one...");
}
}
notifyFailureToSender(msg, receiverID, new InternalError("No valid address contained within the AID " + receiverID.getName()));
}
}
catch (NotFoundException nfe) {
// The receiver does not exist --> Send a FAILURE message
if (msg.getTraceID() != null) {
myLogger.log(Logger.WARNING, msg.getTraceID()+" - Receiver does not exist.", nfe);
}
notifyFailureToSender(msg, receiverID, new InternalError("Agent not found: " + nfe.getMessage()));
}
catch (IMTPException imtpe) {
// Can't reach the destination container --> Send a FAILURE message
myLogger.log(Logger.WARNING, msg.getTraceID()+" - Receiver unreachable.", imtpe);
notifyFailureToSender(msg, receiverID, new InternalError("Agent unreachable: " + imtpe.getMessage()));
}
catch (ServiceException se) {
// Service error during delivery --> Send a FAILURE message
myLogger.log(Logger.WARNING, msg.getTraceID()+" - Service error delivering message.", se);
notifyFailureToSender(msg, receiverID, new InternalError("Service error: " + se.getMessage()));
}
catch (JADESecurityException jse) {
// Delivery not authorized--> Send a FAILURE message
if (msg.getTraceID() != null) {
myLogger.log(Logger.WARNING, msg.getTraceID()+" - Not authorized.", jse);
}
notifyFailureToSender(msg, receiverID, new InternalError("Not authorized: " + jse.getMessage()));
}
}
private boolean isPersistentDeliveryRetry(GenericMessage msg) {
boolean ret = false;
//#J2ME_EXCLUDE_BEGIN
ACLMessage acl = msg.getACLMessage();
if (acl != null) {
ret = acl.getAllUserDefinedParameters().containsKey(PersistentDeliveryService.ACL_USERDEF_DUE_DATE);
}
//#J2ME_EXCLUDE_END
return ret;
}
void deliverInLocalPlatfrom(GenericMessage msg, AID receiverID) throws IMTPException, ServiceException, NotFoundException, JADESecurityException {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Activating local-platform delivery");
}
MainContainer impl = myContainer.getMain();
if (impl != null) {
while (true) {
// Directly use the GADT on the main container
ContainerID cid = impl.getContainerID(receiverID);
MessagingSlice targetSlice = oneShotDeliver(cid, msg, receiverID);
if (targetSlice != null) {
return;
}
}
} else {
// Try first with the cached <AgentID;MessagingSlice> pairs
MessagingSlice cachedSlice = (MessagingSlice) cachedSlices.get(receiverID);
if (cachedSlice != null) { // Cache hit :-)
try {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Delivering message to cached slice "+cachedSlice.getNode().getName());
}
cachedSlice.dispatchLocally(msg.getSender(), msg, receiverID);
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Delivery OK.");
}
return;
} catch (IMTPException imtpe) {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Cached slice for receiver " + receiverID.getName() + " unreachable.");
}
} catch (NotFoundException nfe) {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Receiver " + receiverID.getName() + " not found on cached slice container.");
}
}
// Eliminate stale cache entry
cachedSlices.remove(receiverID);
}
// Either the receiver was not found in cache or the cache entry was no longer valid
deliverUntilOK(msg, receiverID);
}
}
private void deliverUntilOK(GenericMessage msg, AID receiverID) throws IMTPException, NotFoundException, ServiceException, JADESecurityException {
while (true) {
MessagingSlice mainSlice = (MessagingSlice) getSlice(MAIN_SLICE);
ContainerID cid;
try {
cid = mainSlice.getAgentLocation(receiverID);
}
catch (IMTPException imtpe) {
// Try to get a newer slice and repeat...
mainSlice = (MessagingSlice) getFreshSlice(MAIN_SLICE);
cid = mainSlice.getAgentLocation(receiverID);
}
MessagingSlice targetSlice = oneShotDeliver(cid, msg, receiverID);
if (targetSlice != null) {
// On successful message dispatch, put the slice into the slice cache
cachedSlices.put(receiverID, targetSlice);
return;
}
}
}
private MessagingSlice oneShotDeliver(ContainerID cid, GenericMessage msg, AID receiverID) throws IMTPException, ServiceException, JADESecurityException {
if (msg.getTraceID() != null) {
myLogger.log(Logger.FINER, msg.getTraceID()+" - Receiver "+receiverID.getLocalName()+" lives on container "+cid.getName());
}
MessagingSlice targetSlice = (MessagingSlice) getSlice(cid.getName());
try {
try {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID()+" - Delivering message to slice "+targetSlice.getNode().getName());
}
targetSlice.dispatchLocally(msg.getSender(), msg, receiverID);
}
catch (IMTPException imtpe) {
// Try to get a newer slice and repeat...
if (msg.getTraceID() != null) {
myLogger.log(Logger.FINER, msg.getTraceID()+" - Messaging slice on container "+cid.getName()+" unreachable. Try to get a fresh one.");
}
targetSlice = (MessagingSlice) getFreshSlice(cid.getName());
if (msg.getTraceID() != null && (targetSlice != null)) {
myLogger.log(Logger.FINER, msg.getTraceID()+" - Fresh slice for container "+cid.getName()+" found.");
}
targetSlice.dispatchLocally(msg.getSender(), msg, receiverID);
}
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID()+" - Delivery OK");
}
return targetSlice;
}
catch (NotFoundException nfe) {
// The agent was found in the GADT, but not on the container where it is supposed to
// be. Possibly it moved elsewhere in the meanwhile. ==> Try again.
if (msg.getTraceID() != null) {
myLogger.log(Logger.FINER, msg.getTraceID()+" - Receiver "+receiverID.getLocalName()+" not found on container "+cid.getName()+". Possibly he moved elsewhere --> Retry");
}
}
catch (NullPointerException npe) {
// This is thrown if targetSlice is null: The agent was found in the GADT,
// but his container does not exist anymore. Possibly the agent moved elsewhere in
// the meanwhile ==> Try again.
if (msg.getTraceID() != null) {
myLogger.log(Logger.FINER, msg.getTraceID()+" - Container "+cid.getName()+" for receiver "+receiverID.getLocalName()+" does not exist anymore. Possibly the receiver moved elsewhere --> Retry");
}
}
// Wait a bit before enabling next delivery attempt
try {Thread.sleep(200);} catch (InterruptedException ie) {}
return null;
}
private void forwardMessage(GenericMessage msg, AID receiver, String address) throws MTPException {
// FIXME what if there is no envelope?
AID aid = msg.getEnvelope().getFrom();
if (aid == null) {
//System.err.println("ERROR: null message sender. Aborting message dispatch...");
if (myLogger.isLoggable(Logger.SEVERE))
myLogger.log(Logger.SEVERE,"ERROR: null message sender. Aborting message dispatch...");
return;
}
// FIXME The message can no longer be updated
// if has no address set, then adds the addresses of this platform
if(!aid.getAllAddresses().hasNext())
addPlatformAddresses(aid);
try {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID() + " - Routing message out to address "+address);
msg.getEnvelope().addProperties(new Property(ACLMessage.TRACE, msg.getTraceID()));
}
localSlice.routeOut(msg.getEnvelope(),msg.getPayload(), receiver, address);
}
catch(IMTPException imtpe) {
throw new MTPException("Error during message routing", imtpe);
}
}
/**
* This method is used internally by the platform in order
* to notify the sender of a message that a failure was reported by
* the Message Transport Service.
*/
public void notifyFailureToSender(GenericMessage msg, AID receiver, InternalError ie) {
ACLMessage acl = msg.getACLMessage();
if (acl == null) {
// ACLMessage can be null in case we get a failure delivering a message coming from an external platform (received by a local MTP).
// In this case in fact the message is encoded. Try to decode it so that a suitable FAILURE response can be sent back.
// If the payload is mangled in some way (e.g. encrypted) decoding will fail and no suitable FAILURE response will be sent
try {
acl = ((IncomingEncodingFilter) encInFilter).decodeMessage(msg.getEnvelope(), msg.getPayload());
acl.setEnvelope(msg.getEnvelope());
msg.setACLMessage(acl);
}
catch (Exception e) {
// Just do nothing
e.printStackTrace();
}
}
if (acl != null && "true".equals(acl.getUserDefinedParameter(ACLMessage.IGNORE_FAILURE))) {
// Ignore the failure
return;
}
GenericCommand cmd = new GenericCommand(MessagingSlice.NOTIFY_FAILURE, MessagingSlice.NAME, null);
cmd.addParam(msg);
cmd.addParam(receiver);
cmd.addParam(ie);
try {
submit(cmd);
}
catch(ServiceException se) {
// It should never happen
se.printStackTrace();
}
}
/*
* This method is called before preparing the Envelope of an outgoing message.
* It checks for all the AIDs present in the message and adds the addresses, if not present
**/
private void addPlatformAddresses(AID id) {
Iterator it = routes.getAddresses();
while(it.hasNext()) {
String addr = (String)it.next();
id.addAddresses(addr);
}
}
// Package scoped since it is accessed by the OutgoingEncoding filter
final boolean livesHere(AID id) {
if (!acceptForeignAgents) {
// All agents in the platform must have a name with the form
// <local-name>@<platform-name>
String hap = id.getHap();
return CaseInsensitiveString.equalsIgnoreCase(hap, platformID);
}
else {
String[] addresses = id.getAddressesArray();
if (addresses.length == 0) {
return true;
}
else {
boolean allLocalAddresses = true;
for (int i = 0; i < addresses.length; ++i) {
if (!isPlatformAddress(addresses[i])) {
allLocalAddresses = false;
break;
}
}
if (allLocalAddresses) {
return true;
}
else {
// Check in the GADT
try {
MainContainer impl = myContainer.getMain();
if(impl != null) {
// Directly use the GADT on the main container
impl.getContainerID(id);
}
else {
// Use the main slice
MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
try {
mainSlice.getAgentLocation(id);
}
catch(IMTPException imtpe) {
// Try to get a newer slice and repeat...
mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
mainSlice.getAgentLocation(id);
}
}
return true;
}
catch (NotFoundException nfe) {
// The agent does not live in the platform
return false;
}
catch (Exception e) {
// Intra-platform delivery would fail, so try inter-platform
return false;
}
}
}
}
}
private final boolean isPlatformAddress(String addr) {
Iterator it = routes.getAddresses();
while(it.hasNext()) {
String ad = (String)it.next();
if (CaseInsensitiveString.equalsIgnoreCase(ad, addr)) {
return true;
}
}
return false;
}
// Work-around for PJAVA compilation
protected Service.Slice getFreshSlice(String name) throws ServiceException {
return super.getFreshSlice(name);
}
// Only for debugging:
private volatile int traceCnt = 0;
private void checkTracing(GenericMessage msg) {
ACLMessage acl = msg.getACLMessage();
if (acl != null) {
if (myLogger.isLoggable(Logger.FINE) || "true".equals(acl.getAllUserDefinedParameters().get(ACLMessage.TRACE))) {
msg.setTraceID(ACLMessage.getPerformative(acl.getPerformative())+"-"+msg.getSender().getLocalName()+"-"+traceCnt);
traceCnt++;
}
}
}
private String getTraceId(Envelope env) {
Iterator it = env.getAllProperties();
while (it.hasNext()) {
Property p = (Property) it.next();
if (p.getName().equals(ACLMessage.TRACE)) {
return (String) p.getValue();
}
}
return null;
}
// For debugging purpose
public String[] getMessageManagerQueueStatus() {
return myMessageManager.getQueueStatus();
}
// For debugging purpose
public String[] getMessageManagerThreadPoolStatus() {
return myMessageManager.getThreadPoolStatus();
}
// For debugging purpose
public String getMessageManagerGlobalInfo() {
return myMessageManager.getGlobalInfo();
}
// For debugging purpose
public Thread[] getMessageManagerThreadPool() {
return myMessageManager.getThreadPool();
}
protected void clearCachedSlice(String name) {
if (cachedSlices != null){
cachedSlices.clear();
myLogger.log(Logger.INFO, "Clearing cache");
}
super.clearCachedSlice(name);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -