📄 p2psocket.java
字号:
import java.net.*;
import java.io.*;
import net.jxta.pipe.*;
import net.jxta.protocol.*;
import net.jxta.peergroup.*;
import net.jxta.document.*;
import net.jxta.discovery.*;
import net.jxta.exception.*;
import net.jxta.endpoint.*;
import net.jxta.impl.endpoint.*;
public class P2PSocket
extends Thread
implements OutputListener,PipeMsgListener
{
private String inputPipeName = null; //输入管道的名称
private String outputPipeName = null; //输出管道的名称
private PipeAdvertisement inputPipeAdv = null; //输入管道广告
private PipeAdvertisement outputPipeAdv = null; //输出管道广告
private InputPipe ip = null; //输入管道
private OutputPipe op = null; //输出管道
private PipeMsgListener pml = null; //输入管道的监听器
private OutputListener opl = null; //输出管道的监听器
private PeerGroup pg = null; //所属的组
private PipeService ps = null; //管道服务
private DiscoveryService disc = null; //发现服务
public P2PSocket()
{
if(pg == null) this.newPeerGroup();
ps = pg.getPipeService();
disc = pg.getDiscoveryService();
}
public P2PSocket(String inputPipeName,String outputPipeName)
{
if(pg == null) this.newPeerGroup();
ps = pg.getPipeService();
disc = pg.getDiscoveryService();
this.setInputPipeName(inputPipeName);
this.setOutputPipeName(outputPipeName);
}
/**
* Peer与InputPipe绑定,开始监听---------------------------------(1)
*/
public boolean bind()
{
for(int i=0;i<5;i++)
{
try
{
this.start();
if(pml != null) ip = ps.createInputPipe(inputPipeAdv,pml);
else ip = ps.createInputPipe(inputPipeAdv,this);
}catch(Exception e)
{
System.out.println("Error creating input pipe.");
}
if(ip != null)
{
return true;
}
}
return false;
}
/**
* 负责定时发布与InputPipe绑定的信息---------------------------------(2)
*/
public void run()
{
try
{
disc.remotePublish(inputPipeAdv,DiscoveryService.ADV,10*60*1000);
disc.publish(inputPipeAdv,DiscoveryService.ADV,10*60*1000,10*60*1000);
this.sleep(10*60*1000);
}catch(InterruptedException ie)
{
System.out.println("出错!"+ie);
}catch(IOException ioe)
{
System.out.println("出错!"+ioe);
System.exit(-1);
}
}
/**
* Peer与OutputPipe绑定,建立通信连接---------------------------------(3)
*/
public boolean connect()
{
for(int i=0;i<10;i++)
{
try
{
System.out.println("Create Outpipe");
if(opl != null) ps.createOutputPipe(outputPipeAdv,opl);
else ps.createOutputPipe(outputPipeAdv,this);
}catch(Exception e)
{
System.out.println("Error creating ounput pipe.");
e.printStackTrace();
}
if( opl != null && opl.getOutputPipe() != null )break;
try
{
Thread.sleep(5*1000);
}catch(InterruptedException ie)
{
System.out.println("出错!"+ie);
System.exit(-1);
}
}
if( opl != null && opl.getOutputPipe() != null )
{
this.op = opl.getOutputPipe();
return true;
}
else
{
System.out.println("通信连接失败!");
System.exit(-1);
}
return false;
}
public boolean connect(String outputPipeName)
{
this.setOutputPipeName(outputPipeName);
if( this.connect() ) return true;
else return false;
}
public void outputPipeEvent(OutputPipeEvent event)
{
op = event.getOutputPipe();
}
/**
* 通过OutputPipe发送消息---------------------------------(4)
*/
public boolean send(Message mess)
{
if(opl != null) op = opl.getOutputPipe();
while(op == null)
{
try
{
Thread.sleep(5*1000);
}catch(InterruptedException ie)
{
System.out.println("出错!"+ie);
System.exit(-1);
}
}
try
{
op.send(mess);
return true;
}catch(IOException ioe)
{
System.out.println("发送消息失败!");
return false;
}
}
public void pipeMsgEvent(PipeMsgEvent event){}
/**
* 设定OutputPipe的监听器,监听器需要实现接口outputListener--------(5)
*/
public void setOutListener(OutputListener opl) { this.opl = opl; }
/**
* 设定InputPipe的监听器---------------------------------(6)
*/
public void setInListener(PipeMsgListener opl) { this.pml = pml; }
public void setPeerGroup(PeerGroup pg)
{
this.pg = pg;
this.ps = pg.getPipeService();
this.disc = pg.getDiscoveryService();
}
public void setInputPipeName(String inputPipeName)
{
this.inputPipeName = inputPipeName;
inputPipeAdv = createPipeAdvFromFile("adv/"+inputPipeName+".xml");
}
public void setOutputPipeName(String outputPipeName)
{
this.outputPipeName = outputPipeName;
outputPipeAdv = createPipeAdvFromFile("adv/"+outputPipeName+".xml");
}
public InputPipe getInputPipe() { return this.ip; }
public String getInputPipeName() { return this.inputPipeName; }
public OutputPipe getOutputPipe() { return this.op; }
public String getOutputPipeName() { return this.outputPipeName; }
private void newPeerGroup()
{
try
{
pg = PeerGroupFactory.newNetPeerGroup();
}catch(PeerGroupException e)
{
System.out.println("fatal error : group creation failure");
e.printStackTrace();
System.exit(-1);
}
}
private PipeAdvertisement createPipeAdvFromFile(String filename)
{
PipeAdvertisement pipeAdv = null;
try
{
FileInputStream is = new FileInputStream(filename);
pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(new MimeMediaType("text/xml"),is);
is.close();
}catch(Exception e)
{
System.out.println("Error to create PipeAdvertisement from file");
e.printStackTrace();
System.exit(1);
}
return pipeAdv;
}
};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -