📄 messengerimpl.java
字号:
/* * @<#>MessengerImpl.java version 0.0.1, 1/1/2000 * * THIS PROGRAM IS FREE SOFTWARE; YOU CAN DISTRIBUTE IT AND/OR * MODIFY IT UNDER THE TERMS OF THE GNU GENERAL PUBLIC LICENSE * AS PUBLISHED BY THE FREE SOFTWARE FOUNDATION. * * THIS PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, * BUT WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. SEE THE * GNU GENERAL PUBLIC LICENSE FOR MORE DETAILS. * * Copyright (c) 2000-2002 Wayne State University. All Rights Reserved. */package naplet.message;import java.rmi.*;import java.rmi.server.UnicastRemoteObject;import java.io.*;import java.net.*;import java.util.*;import naplet.server.*;import naplet.tracing.*;import naplet.*;/** * The <code>MessengerImpl</code> class implements the methods * of <code>Messenger</code>. * * @version 0.0.1, 1/1/2000 * @authro C. Xu */public class MessengerImpl implements Messenger, Runnable { static private int DefaultTimeOut = 5000; static private int DefaultDelay = 100; static private int InitialMboxSize = 10; static private int InitialUnsortedBoxSize = 10; private static MessengerImpl instance = null; private ThreadGroup msgThreadGroup; private ServerProperty property; private MailBoxCabinet mboxCabinet; /** * Constructor * * @param property Naplet property */ protected MessengerImpl( ServerProperty property ) { this.property = property; mboxCabinet = new MailBoxCabinet(); } public static MessengerImpl getInstance(ServerProperty property) { if (instance==null) instance = new MessengerImpl( property ); return instance; } public void run() { msgThreadGroup = new ThreadGroup("Message Handler Main"); } public MailBox getMailBox(NapletID nid) { return mboxCabinet.get( nid ); } public MailBox openMailBox( NapletID nid ) { return mboxCabinet.open( nid ); } public void removeMailBox( NapletID nid ) { mboxCabinet.remove( nid ); } public Message checkMailBox( NapletID nid ) { Message msg = null; MailBox mbox = getMailBox( nid ); if (mbox != null) { try { msg = mbox.getMessage(); } catch (InterruptedException ie) {} } return msg; } public void receive(Message msg, InetAddress src) { NapletID recv = msg.getReceiver(); MailBox mbox = (MailBoxImpl)mboxCabinet.get( recv ); if (mbox!=null) { /* The receiving naplet is in this server */ int type = msg.getType(); try { mbox.putMessage(msg); } catch (InterruptedException ie) {} if (type!=Message.USER){ property.getNapletMonitor().interrupt( recv ); } } else { /** The receiving naplet is not in this server * Check with the server log to see if the naplet has * already visited this server or not. * If visited, forward the message * Otherwise, put it in UnsortedMailBox **/ System.out.println("Cann't find a mailbox for the naplet"); ArrayList fpList = property.getNapletServerLog().checkLog(msg.getReceiver()); if (fpList==null) { /* The naplet has never visited */ try { mboxCabinet.unsortedBox.putMessage(msg); } catch (InterruptedException ie) {} System.out.println("Message is stored in unsorted mail box"); } else { /* Forwarding */ FootPrint fp = (FootPrint) fpList.get(fpList.size()-1); if (fp!=null && fp.getEvent()==NapletEvent.DEPART) { URN dest = fp.getServer(); try { send( msg.getSender(), dest, msg); System.out.println("Message forwarded to " + dest.toString()); } catch (NapletCommunicationException nce) { System.err.println("Forwarding message error"); } } else { throw new NapletInternalError("Naplet Visiting Log Error"); } } } } /** * To post a message for a resident naplet. * * @param src NapletID of the sender * @param dest NapletID of the receiver * @param message a <code>Message</code> instance to be posted * @throws NapletCommunicationException */ public void send(NapletID src, NapletID dest, Message message) throws NapletCommunicationException { send( src, dest, message, DefaultTimeOut); } /** * To post a message for a resident naplet. * * @param src NapletID of the sender * @param dest NapletID of the receiver * @param message a <code>Message</code> instance to be posted * @param timeout Time limit to locate the receving naplet server * @throws NapletCommunicationException */ public void send (NapletID src, NapletID dest, Message message, long timeout) throws NapletCommunicationException { int delay = DefaultDelay; URN server = null; try { server = property.getLocator().lookup( dest ); } catch (NapletLocateException nte) { throw new NapletCommunicationException( "Can't locate the naplet"); } if (server==null) { throw new NapletCommunicationException( "Can't locate the destination"); } else { send(src, server, message); } } /** * Post a message to a server for a resident naplet. * * @param src NapletID of the sender * @param dest Destination server * @param message a <code>Message</code> instance to be posted * @throw NapletCommunicationException */ public void send(NapletID src, URN dest, Message msg) throws NapletCommunicationException { if (dest==null) throw new NapletCommunicationException("Null destination"); else { Thread proxy = new Thread( msgThreadGroup, new PostProxy( dest, msg ) ); proxy.start(); } } private class PostProxy implements Runnable { private URN dest; private Message message; public PostProxy( URN dest, Message msg) { this.dest = dest; this.message = msg; } public void run() { try { NapletServer ns = (NapletServer) Naming.lookup("rmi://"+dest.toString() ); ns.post(message); } catch (Exception e) { System.out.println("Unable to post message to "+dest.toString()); e.printStackTrace(); } } } /** * The private <code>MailBoxCabinet</code> class implements a cabinet of * mailboxes. One mailbox is created for each long lived naplet. * In addition, a special mailbox is created for proxy of all * outstanding naplets which are created locally. * The special naplet proxy has a NapletID like "czxu@galileo:0". * Another special mailbox is used to temporarily store messages that * arrive earlier than their receiving naplets. * * @see MailBox MailBoxImpl * @version 0.0.1, 1/1/2000 * @author C. Xu */ private class MailBoxCabinet { /** * This is a special mailbox in support of message forwarding. * Since the naplet servers assume non-FIFO communication channels, * a message can arrive at its destination server before its receiving * naplet. This mailbox is used to store such messages temporarily. */ private MailBox unsortedBox; /** * Cabinet is a collection of mail boxes, each assigned to * a long-lived visiting naplet. */ private HashMap cabinet; /** * Constructor * @param size Mailbox cabinet size */ public MailBoxCabinet( int size ) { unsortedBox = new MailBoxImpl( InitialUnsortedBoxSize ); if (size >0) cabinet = new HashMap( size ); else cabinet = new HashMap( InitialMboxSize ); } /** * Constructor with a default mailbox cabinet size */ public MailBoxCabinet() { this ( InitialMboxSize ); } /** * Create a mailbox for a naplet or proxy, check unsorted mails * for those arrived earlier, and move them into the newly created * mailbox, if exists. If the mailbox is already existed, reference * to the mailbox is returned. * * @param nid Naplet ID */ public synchronized MailBox open( NapletID nid ) { MailBox mbox = null; String key = nid.toString(); if (cabinet.containsKey( key ) ) mbox = (MailBoxImpl) cabinet.get(key); else { mbox = new MailBoxImpl(); // Check unsorted mails and trasfer related mails in the box Message msg = unsortedBox.tryGetMessage( nid ); while ( msg!=null ) { try { mbox.putMessage( msg ); msg = unsortedBox.tryGetMessage( nid ); } catch (InterruptedException ie) {} } cabinet.put(key, mbox); } return mbox; } /** * Get a reference to mailbox to which <code>key</code> is mapped, * or <code>null</code> if no map exists. */ public synchronized MailBox get( NapletID nid ) { String key = nid.toString(); return (MailBox) cabinet.get(key); } /** * Remove a mailbox from the cabinet */ public synchronized void remove( NapletID nid ) { String key = nid.toString(); cabinet.remove(key); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -