📄 messagingservice.java
字号:
private void postMessage(ACLMessage msg, AID receiverID) throws NotFoundException {
boolean found = myContainer.postMessageToLocalAgent(msg, receiverID);
if(!found) {
throw new NotFoundException("Messaging service slice failed to find " + receiverID);
}
}
private MTPDescriptor installMTP(String address, String className) throws IMTPException, ServiceException, MTPException {
try {
// Create the MTP
Class c = Class.forName(className);
MTP proto = (MTP)c.newInstance();
InChannel.Dispatcher dispatcher = new InChannel.Dispatcher() {
public void dispatchMessage(Envelope env, byte[] payload) {
//log("Message from remote platform received", 2);
if (myLogger.isLoggable(Logger.FINE))
myLogger.log(Logger.FINE,"Message from remote platform received");
// To avoid message loops, make sure that the ID of this ACC does
// not appear in a previous 'received' stamp
ReceivedObject[] stamps = env.getStamps();
for(int i = 0; i < stamps.length; i++) {
String id = stamps[i].getBy();
if(CaseInsensitiveString.equalsIgnoreCase(id, accID)) {
System.err.println("ERROR: Message loop detected !!!");
System.err.println("Route is: ");
for(int j = 0; j < stamps.length; j++)
System.err.println("[" + j + "]" + stamps[j].getBy());
System.err.println("Message dispatch aborted.");
return;
}
}
// Put a 'received-object' stamp in the envelope
ReceivedObject ro = new ReceivedObject();
ro.setBy(accID);
ro.setDate(new Date());
env.setReceived(ro);
Iterator it = env.getAllIntendedReceiver();
// FIXME: There is a problem if no 'intended-receiver' is present,
// but this should not happen
while (it.hasNext()) {
AID rcv = (AID)it.next();
GenericMessage msg = new GenericMessage(env,payload);
String traceId = getTraceId(env);
if (traceId != null) {
myLogger.log(Logger.INFO, "MTP In-Channel handling message from the outside for receiver "+rcv.getName()+". TraceID = "+traceId);
msg.setTraceID(traceId);
}
myMessageManager.deliver(msg, rcv, MessagingService.this);
}
}
};
if(address == null) {
// Let the MTP choose the address
TransportAddress ta = proto.activate(dispatcher, myProfile);
address = proto.addrToStr(ta);
}
else {
// Convert the given string into a TransportAddress object and use it
TransportAddress ta = proto.strToAddr(address);
proto.activate(dispatcher, ta, myProfile);
}
MTPDescriptor result = new MTPDescriptor(proto.getName(), className, new String[] {address}, proto.getSupportedProtocols());
routes.addLocalMTP(address, proto, result);
String[] pp = result.getSupportedProtocols();
for (int i = 0; i < pp.length; ++i) {
//log("Added Route-Via-MTP for protocol "+pp[i], 1);
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG,"Added Route-Via-MTP for protocol "+pp[i]);
}
String[] addresses = result.getAddresses();
for(int i = 0; i < addresses.length; i++) {
myContainer.addAddressToLocalAgents(addresses[i]);
}
GenericCommand gCmd = new GenericCommand(MessagingSlice.NEW_MTP, MessagingSlice.NAME, null);
gCmd.addParam(result);
gCmd.addParam(myContainer.getID());
submit(gCmd);
return result;
}
/*#DOTNET_INCLUDE_BEGIN
catch(System.TypeLoadException tle)
{
ClassNotFoundException cnfe = new ClassNotFoundException(tle.get_Message());
throw new MTPException("The class " + className + " raised IllegalAccessException (see nested exception)", cnfe);
}
catch(System.TypeInitializationException tie)
{
InstantiationException ie = new InstantiationException(tie.get_Message());
throw new MTPException("The class " + className + " raised InstantiationException (see nested exception)", ie);
}
#DOTNET_INCLUDE_END*/
catch(ClassNotFoundException cnfe)
{
throw new MTPException("ERROR: The class " + className + " for the " + address + " MTP was not found");
}
catch(InstantiationException ie) {
throw new MTPException("The class " + className + " raised InstantiationException (see nested exception)", ie);
}
catch(IllegalAccessException iae) {
throw new MTPException("The class " + className + " raised IllegalAccessException (see nested exception)", iae);
}
}
private void uninstallMTP(String address) throws IMTPException, ServiceException, NotFoundException, MTPException {
RoutingTable.MTPInfo info = routes.removeLocalMTP(address);
if(info != null) {
MTP proto = info.getMTP();
TransportAddress ta = proto.strToAddr(address);
proto.deactivate(ta);
MTPDescriptor desc = info.getDescriptor();
//MTPDescriptor desc = new MTPDescriptor(proto.getName(), proto.getClass().getName(), new String[] {address}, proto.getSupportedProtocols());
String[] addresses = desc.getAddresses();
for(int i = 0; i < addresses.length; i++) {
myContainer.removeAddressFromLocalAgents(addresses[i]);
}
GenericCommand gCmd = new GenericCommand(MessagingSlice.DEAD_MTP, MessagingSlice.NAME, null);
gCmd.addParam(desc);
gCmd.addParam(myContainer.getID());
submit(gCmd);
}
else {
throw new MTPException("No such address was found on this container: " + address);
}
}
private void newMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException, ServiceException {
MainContainer impl = myContainer.getMain();
if(impl != null) {
// Update the routing tables of all the other slices
Service.Slice[] slices = getAllSlices();
for(int i = 0; i < slices.length; i++) {
try {
MessagingSlice slice = (MessagingSlice)slices[i];
String sliceName = slice.getNode().getName();
if(!sliceName.equals(cid.getName())) {
slice.addRoute(mtp, cid.getName());
}
}
catch(Throwable t) {
// Re-throw allowed exceptions
if(t instanceof IMTPException) {
throw (IMTPException)t;
}
if(t instanceof ServiceException) {
throw (ServiceException)t;
}
//System.err.println("### addRoute() threw " + t.getClass().getName() + " ###");
myLogger.log(Logger.WARNING,"### addRoute() threw " + t + " ###");
}
}
impl.newMTP(mtp, cid);
}
else {
// Do nothing for now, but could also route the command to the main slice, thus enabling e.g. AMS replication
}
}
private void deadMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException, ServiceException {
MainContainer impl = myContainer.getMain();
if(impl != null) {
// Update the routing tables of all the other slices
Service.Slice[] slices = getAllSlices();
for(int i = 0; i < slices.length; i++) {
try {
MessagingSlice slice = (MessagingSlice)slices[i];
String sliceName = slice.getNode().getName();
if(!sliceName.equals(cid.getName())) {
slice.removeRoute(mtp, cid.getName());
}
}
catch(Throwable t) {
// Re-throw allowed exceptions
if(t instanceof IMTPException) {
throw (IMTPException)t;
}
if(t instanceof ServiceException) {
throw (ServiceException)t;
}
myLogger.log(Logger.WARNING,"### removeRoute() threw " + t + " ###");
}
}
impl.deadMTP(mtp, cid);
}
else {
// Do nothing for now, but could also route the command to the main slice, thus enabling e.g. AMS replication
}
}
} // END of inner class CommandTargetSink
/**
Inner class for this service: this class receives commands from
service <code>Sink</code> and serves them, coordinating with
remote parts of this service through the <code>Slice</code>
interface (that extends the <code>Service.Slice</code>
interface).
*/
private class ServiceComponent implements Service.Slice {
// Implementation of the Service.Slice interface
public Service getService() {
return MessagingService.this;
}
public Node getNode() throws ServiceException {
try {
return MessagingService.this.getLocalNode();
}
catch(IMTPException imtpe) {
throw new ServiceException("Problem in contacting the IMTP Manager", imtpe);
}
}
public VerticalCommand serve(HorizontalCommand cmd) {
VerticalCommand result = null;
try {
String cmdName = cmd.getName();
Object[] params = cmd.getParams();
if(cmdName.equals(MessagingSlice.H_DISPATCHLOCALLY)) {
GenericCommand gCmd = new GenericCommand(MessagingSlice.SEND_MESSAGE, MessagingSlice.NAME, null);
AID senderAID = (AID)params[0];
GenericMessage msg = (GenericMessage)params[1];
AID receiverID = (AID)params[2];
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, "MessagingService slice received message "+MessageManager.stringify(msg)+" for receiver "+receiverID.getLocalName()+". Trace ID = "+msg.getTraceID());
}
gCmd.addParam(senderAID);
gCmd.addParam(msg);
gCmd.addParam(receiverID);
result = gCmd;
}
else if(cmdName.equals(MessagingSlice.H_ROUTEOUT)) {
Envelope env = (Envelope)params[0];
byte[] payload = (byte[])params[1];
AID receiverID = (AID)params[2];
String address = (String)params[3];
routeOut(env, payload, receiverID, address);
}
else if(cmdName.equals(MessagingSlice.H_GETAGENTLOCATION)) {
AID agentID = (AID)params[0];
cmd.setReturnValue(getAgentLocation(agentID));
}
else if(cmdName.equals(MessagingSlice.H_INSTALLMTP)) {
GenericCommand gCmd = new GenericCommand(MessagingSlice.INSTALL_MTP, MessagingSlice.NAME, null);
String address = (String)params[0];
String className = (String)params[1];
gCmd.addParam(address);
gCmd.addParam(className);
result = gCmd;
}
else if(cmdName.equals(MessagingSlice.H_UNINSTALLMTP)) {
GenericCommand gCmd = new GenericCommand(MessagingSlice.UNINSTALL_MTP, MessagingSlice.NAME, null);
String address = (String)params[0];
gCmd.addParam(address);
result = gCmd;
}
else if(cmdName.equals(MessagingSlice.H_NEWMTP)) {
MTPDescriptor mtp = (MTPDescriptor)params[0];
ContainerID cid = (ContainerID)params[1];
GenericCommand gCmd = new GenericCommand(MessagingSlice.NEW_MTP, MessagingSlice.NAME, null);
gCmd.addParam(mtp);
gCmd.addParam(cid);
result = gCmd;
}
else if(cmdName.equals(MessagingSlice.H_DEADMTP)) {
MTPDescriptor mtp = (MTPDescriptor)params[0];
ContainerID cid = (ContainerID)params[1];
GenericCommand gCmd = new GenericCommand(MessagingSlice.DEAD_MTP, MessagingSlice.NAME, null);
gCmd.addParam(mtp);
gCmd.addParam(cid);
result = gCmd;
}
else if(cmdName.equals(MessagingSlice.H_ADDROUTE)) {
MTPDescriptor mtp = (MTPDescriptor)params[0];
String sliceName = (String)params[1];
addRoute(mtp, sliceName);
}
else if(cmdName.equals(MessagingSlice.H_REMOVEROUTE)) {
MTPDescriptor mtp = (MTPDescriptor)params[0];
String sliceName = (String)params[1];
removeRoute(mtp, sliceName);
}
}
catch(Throwable t) {
cmd.setReturnValue(t);
}
return result;
}
// Private methods
private void routeOut(Envelope env, byte[] payload, AID receiverID, String address) throws IMTPException, MTPException {
RoutingTable.OutPort out = routes.lookup(address);
//log("Routing message to "+receiverID.getName()+" towards port "+out, 2);
if (myLogger.isLoggable(Logger.FINE))
myLogger.log(Logger.FINE,"Routing message to "+receiverID.getName()+" towards port "+out);
if(out != null)
out.route(env, payload, receiverID, address);
else
throw new MTPException("No suitable route found for address " + address + ".");
}
private ContainerID getAgentLocation(AID agentID) throws IMTPException, NotFoundException {
MainContainer impl = myContainer.getMain();
if(impl != null) {
return impl.getContainerID(agentID);
}
else {
// Do nothing for now, but could also have a local GADT copy, thus enabling e.g. Main Container replication
return null;
}
}
private void addRoute(MTPDescriptor mtp, String sliceName) throws IMTPException, ServiceException {
// Be sure the slice is fresh --> bypass the service cache
MessagingSlice slice = (MessagingSlice)getFreshSlice(sliceName);
if (routes.addRemoteMTP(mtp, sliceName, slice)) {
// This is actually a new MTP --> Add the new address to all local agents.
// NOTE that a notification about a remote MTP can be received more than once in case of fault and successive recovery of the Main Container
String[] pp = mtp.getSupportedProtocols();
for (int i = 0; i < pp.length; ++i) {
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG,"Added Route-Via-Slice("+sliceName+") for protocol "+pp[i]);
}
String[] addresses = mtp.getAddresses();
for(int i = 0; i < addresses.length; i++) {
myContainer.addAddressToLocalAgents(addresses[i]);
}
}
}
private void removeRoute(MTPDescriptor mtp, String sliceName) throws IMTPException, ServiceException {
// Don't care about whether or not the slice is fresh. Only the name matters.
MessagingSlice slice = (MessagingSlice)getSlice(sliceName);
routes.removeRemoteMTP(mtp, sliceName, slice);
String[] pp = mtp.getSupportedProtocols();
for (int i = 0; i < pp.length; ++i) {
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG,"Removed Route-Via-Slice("+sliceName+") for protocol "+pp[i]);
}
String[] addresses = mtp.getAddresses();
for(int i = 0; i < addresses.length; i++) {
myContainer.removeAddressFromLocalAgents(addresses[i]);
}
}
} // End of ServiceComponent class
///////////////////////////////////////////////
// Message delivery
///////////////////////////////////////////////
// Entry point for the ACL message delivery
public void deliverNow(GenericMessage msg, AID receiverID) {
if (msg.getTraceID() != null) {
myLogger.log(Logger.INFO, msg.getTraceID()+" - Serving message delivery");
}
try {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -