📄 msgreceive.java
字号:
import net.jxta.credential.AuthenticationCredential;
import net.jxta.credential.Credential;
import net.jxta.discovery.DiscoveryService;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.StructuredDocument;
import net.jxta.platform.NetworkManager;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.Message.ElementIterator;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.WireFormatMessage;
import net.jxta.endpoint.WireFormatMessageFactory;
import net.jxta.exception.PeerGroupException;
import net.jxta.id.IDFactory;
import net.jxta.membership.Authenticator;
import net.jxta.membership.MembershipService;
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupFactory;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.PipeID;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PeerAdvertisement;
import net.jxta.protocol.PeerGroupAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.util.CountingOutputStream;
import net.jxta.util.DevNullOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Enumeration;
import java.io.File;
import javax.security.cert.CertificateException;
public class MsgReceive implements PipeMsgListener {
private PeerGroup netPeerGroup = null;
static NetworkManager manager;
static PeerGroup myPeerGroup = null;
static PipeService pipeService;
static PipeAdvertisement pipeAdv;
private InputPipe inputPipe = null;
private PeerAdvertisement padv;
// 初始化为NetPeerGroup的查找服务
private DiscoveryService discoSvc;
// 新建点组的查找服务
private DiscoveryService mydiscoSvc;
// 指定的管道的ID
public final static String PIPEIDSTR = "urn:jxta:uuid-59616261646162614E50472050325033C0C1DE89719B456691A596B983BA0E1004";
/**
* 初始化JXTA 创建默认的点组,再新建一个点组myGroup,加入到这个点组,获取管道服务,创建管道通告
*/
public MsgReceive() {
manager = null;
try {
manager = new net.jxta.platform.NetworkManager(
NetworkManager.ConfigMode.EDGE, "leihl", new File(new File(
".cache"), "leihl").toURI());
manager.setUseDefaultSeeds(true);
//manager.getConfigurator().setHttpOutgoing(true);
//manager.getConfigurator().setHttpIncoming(true);
//manager.getConfigurator().setTcpIncoming(true);
//manager.getConfigurator().setTcpOutgoing(true);
manager.startNetwork();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
netPeerGroup = manager.getNetPeerGroup();
// 获取默认点组的搜索服务
discoSvc = netPeerGroup.getDiscoveryService();
// 查找并加入到点组chaolu,如果没有找到,就创建并加入点组chaolu
DiscAndJoin discAndJoin = new DiscAndJoin();
try {
myPeerGroup = discAndJoin.JoinPeerGroup(netPeerGroup, discoSvc);
myPeerGroup.getRendezVousService().startRendezVous();
} catch (Exception e) {
e.printStackTrace();
}
// 获得默认的点组的管道服务对象
pipeService = myPeerGroup.getPipeService();
mydiscoSvc = myPeerGroup.getDiscoveryService();
pipeAdv = getPipeAdvertisement();
DiscoveryPeers();
}
public static void main(String args[]) {
MsgReceive server = new MsgReceive();
// 创建输入管道,并监听
server.start();
// 创建输出管道,并监听
String value = System.getProperty("RDVWAIT", "false");
boolean waitForRendezvous = Boolean.valueOf(value);
MsgSend Msgsend = new MsgSend(waitForRendezvous, pipeService, pipeAdv,
myPeerGroup.getPeerName(), myPeerGroup);
Msgsend.run();
}
/**
* 创建输入管道
*/
public void start() {
try {
inputPipe = pipeService.createInputPipe(pipeAdv, this);
} catch (IOException io) {
io.printStackTrace();
return;
}
if (inputPipe == null) {
System.out.println(" cannot open InputPipe");
System.exit(-1);
}
}
/**
* 当监听到有消息到达时候,就触发以下事件
*/
public void pipeMsgEvent(PipeMsgEvent event) {
Message msg;
try {
// Obtain the message from the event
msg = event.getMessage();
} catch (Exception e) {
e.printStackTrace();
return;
}
Message.ElementIterator en = msg.getMessageElements();
if (!en.hasNext()) {
return;
}
MessageElement msgElement = msg.getMessageElement(null,
MsgSend.MESSAGE_NAME_SPACE);
MessageElement MsgPeerN = msg.getMessageElement(null, "peerName");
if (msgElement.toString() == null) {
System.out.println("null msg received");
} else {
System.out.println(MsgPeerN.toString() + "说:"
+ msgElement.toString());
}
}
public void DiscoveryPeers() {
int count = 3;
Enumeration ae = null;
// 查找点组通告
while (count-- > 0) {
try {
mydiscoSvc.getRemoteAdvertisements(null, DiscoveryService.PEER,
null, null, 10, null);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
ae = mydiscoSvc.getLocalAdvertisements(DiscoveryService.PEER,
null, null);
if ((ae != null) && ae.hasMoreElements()) {
break;
}
} catch (IOException e) {
}
}
// 如果没有找到,就创建点组
if (ae == null || !ae.hasMoreElements()) {
System.out.println("没有找到别的对等点");
} else {
// 如果找到,就加入
while (ae.hasMoreElements()) {
padv = (PeerAdvertisement) ae.nextElement();
System.out.println("查找到在线的用户:" + padv.getName());
}
}
}
/**
* 创建输出管道通告,管道ID是指定的,类型是多播型的
*
* @return
*/
public static PipeAdvertisement getPipeAdvertisement() {
PipeID pipeID = null;
try {
pipeID = (PipeID) IDFactory.fromURI(new URI(PIPEIDSTR));
} catch (URISyntaxException use) {
use.printStackTrace();
}
PipeAdvertisement advertisement = (PipeAdvertisement) AdvertisementFactory
.newAdvertisement(PipeAdvertisement.getAdvertisementType());
advertisement.setPipeID(pipeID);
advertisement.setType(PipeService.PropagateType);
advertisement.setName("PipeDemo");
return advertisement;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -