📄 listener.java
字号:
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupFactory;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnectionFactory;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import net.jxta.id.ID;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.document.MimeMediaType;
import net.jxta.document.AdvertisementFactory;
import net.jxta.discovery.DiscoveryService;
import java.io.IOException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.Hashtable;
public class Listener extends Thread
{
private Router router = null;
private PeerGroup netPeerGroup = null;
private InputPipe inputPipe;
private OutputPipe outputPipe;
private QueueReceiver queueReciver = null;
private QueueConnectionFactory queueConnFactory = null;
public Listener ( String userName,
String connectionFactoryName ) {
try {
//Preparing for JMS.
javax.naming.InitialContext jndiContxt = new InitialContext();
queueConnFactory = (QueueConnectionFactory) jndiContxt.lookup ( connectionFactoryName );
Queue incomingQueue = (Queue) jndiContxt.lookup ( "jms/"+userName );
QueueConnection queueConnection = queueConnFactory.createQueueConnection ();
QueueSession queueSession = queueConnection.createQueueSession ( false, Session.AUTO_ACKNOWLEDGE );
queueReciver = queueSession.createReceiver ( incomingQueue );
queueConnection.start();
//Preparing for JXTA.
netPeerGroup = PeerGroupFactory.newNetPeerGroup ();
router = new Router( userName, netPeerGroup );
createInputPipe (userName + "JMS");
searchAndCreateOutputPipe(userName + "J2ME");
}//try
catch ( Exception ex ) {
ex.printStackTrace ();
}//catch
}//Listener
private void createInputPipe (String peerName) {
PipeAdvertisement pipeAd = createPipeAdvertisement (peerName);
if (pipeAd!= null) {
boolean published = publishPipeAdvertisement ( pipeAd );
if (published) {
PipeService pipeSvc = netPeerGroup.getPipeService ();
try {
inputPipe = pipeSvc.createInputPipe ( pipeAd );
}//try
catch (IOException io)
{
io.printStackTrace ();
}//catch
}//if(published)
}//if (pipeAd!= null)
}//createInputPipe()
private void searchAndCreateOutputPipe(String peerName) {
try {
JXTASearch jxtaSearch = new JXTASearch (netPeerGroup);
System.out.println ("Listner >> getting outputPipe");
PipeAdvertisement pipeAdvert = jxtaSearch.getPipeAdvertisement (peerName);
if (pipeAdvert != null) {
System.out.println("Listener >> outputpipe Id = "+(pipeAdvert.getPipeID()).toString());
outputPipe = jxtaSearch.getOutputPipe(pipeAdvert);
}//if (pipeAdvert != null)
}//try
catch ( Exception ex )
{
System.out.println ( "\nException in searchAndCreateOutputPipe()......\n" );
ex.printStackTrace ();
}//catch
}//searchAndCreateOutputPipe()
private PipeAdvertisement createPipeAdvertisement (String peerName) {
PipeAdvertisement pipeAd = null;
try
{
String fileName = peerName+".xml";
File file = new File ( fileName );
if ( file.exists() )
{
FileInputStream is = new FileInputStream (file);
if ( is.available() > 0 )
{
pipeAd =(PipeAdvertisement) AdvertisementFactory.newAdvertisement(
new MimeMediaType( "text/xml" ), is
);
}//if ( is.available() > 0)
}
else
{
pipeAd = (PipeAdvertisement) AdvertisementFactory.newAdvertisement (
pipeAd.getAdvertisementType ()
);
pipeAd.setName (peerName);
pipeAd.setType ( "JxtaUnicast" );
pipeAd.setPipeID ( (ID)net.jxta.id.IDFactory.newPipeID(
netPeerGroup.getPeerGroupID()
)
);
FileOutputStream os = new FileOutputStream ( fileName );
os.write ( pipeAd.toString().getBytes() );
os.close();
}//end of else
System.out.println ("Listener>> Inputpipe Id : "+(pipeAd.getPipeID()).toString());
return pipeAd;
}//try
catch (Exception ex)
{
ex.printStackTrace ();
}//catch
return null;
}//createPipeAdvertisement()
private boolean publishPipeAdvertisement (PipeAdvertisement pipeAd)
{
DiscoveryService discSvc = netPeerGroup.getDiscoveryService ();
discSvc.remotePublish (pipeAd, DiscoveryService.ADV, DiscoveryService.NO_EXPIRATION);
return true;
}//publishPipeAdvertisement()
public void run()
{
System.out.println ("Listening for JXTA and JMS Messages...");
while( true )
{
int counter = 0;
while ( counter <= 10 )
{
try
{
javax.jms.Message message = queueReciver.receive (1000);
if ( message != null )
{
if ( message instanceof TextMessage )
{
System.out.println ("Got JMS Message...");
TextMessage txtMsg = (TextMessage) message;
router.sendMessageToMobile( txtMsg.getStringProperty("Sender"), outputPipe, txtMsg );
}
}
counter = counter + 1;
}
catch ( Exception ex )
{
ex.printStackTrace ();
}
}//while (counter <= 10)
counter = 0;
while ( counter <= 5 )
{
net.jxta.endpoint.Message msg = null;
try
{
msg = inputPipe.poll ( 1000 );
if ( msg != null ){
System.out.println ("Got JXTA Message...");
router.sendMessageToJMS( queueConnFactory, msg );
}
counter = counter + 1;
}
catch ( InterruptedException iEx )
{
iEx.printStackTrace ();
}
}//while (counter <= 5)
counter = 0;
}//while(true )
}//run()
public static void main(String argv[])
{
if ( argv.length < 2 )
{
System.out.println ( "The arguments are less then 2.");
usage();
return;
}//if ( argv.length < 2 )
Listener listener = new Listener ( new String (argv[0]),
new String (argv[1])
);
Thread listenerThread = new Thread (listener);
listenerThread.start();
}//main()
private static void usage()
{
System.out.println ( "Usage:" +
"\nappclient -client Router.jar " +
" userName connectionFactoryName");
}//usage()
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -