📄 groupbasecommlistener.java
字号:
SecChatListener.CHAT_COMM_MSG, sendm.toString());
}
pipe.sendMessage(message);
sendm = que.remove();
}
}
*/
}catch(Exception ex){
ex.printStackTrace();
}
}
//尚未使用
private PipeAdvertisement createChatPipe(String sender){
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
InputPipe input = null;
PipeAdvertisement pa = null;
try {
Object obj = pgres.getInputPipes().get(sender);
if(obj!=null){//曾经有过 直接取出
input = (InputPipe)obj;
}else{
//
pa = CFUtil.crearePipeAdvertisement(pg.getPeerGroupID(),
PipeService.UnicastType, "ChatPipe");
PipeService pipsrv = pg.getPipeService();
SecChatListener cl = new SecChatListener(pg,sender);
input = pipsrv.createInputPipe(pa, cl);//
pgres.getInputPipes().put(sender, input);
pgres.getInputListener().put(sender, cl);
}
} catch (IOException ex) {
log.error(ex.getMessage());
return null;
}
if(input==null){
log.error("无法为 "+sender+" 创建联系通道");
return null;
}
System.out.println("CCCCCCCCCCCCCCCCC = "+pa.getDocument(MimeMediaType.XMLUTF8));
return pa;
}
private PipeAdvertisement createChatBidiPipe(String sender){
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
JxtaBiDiPipe input = null;
PipeAdvertisement pa = null;
Object obj = pgres.getOutputPipes().get(sender);//pgres.getInputPipes().get(sender);Bidi发送和接收都是同一个,统一放在Output里
if(obj!=null){//曾经有过 直接取出
input = (JxtaBiDiPipe)obj;
}else{
//
pa = CFUtil.crearePipeAdvertisement(CFResource.getInstance().getWorldGroup().getPeerGroupID(),//pg.getPeerGroupID(),
PipeService.UnicastType, "ChatPipe");
log.info(pa.getDocument(MimeMediaType.XMLUTF8));
System.out.println("the pipe adv = "+pa.getDocument(MimeMediaType.XMLUTF8));
//CFUtil.publishObject(pg.getDiscoveryService(), pa);
/*
try{
JxtaServerPipe biSrvp = new JxtaServerPipe(pg, pa);
biSrvp.setPipeTimeout(0);
System.out.println("Accept Client Connecting......");
JxtaBiDiPipe bipipe = biSrvp.accept();
// while(bipipe==null){
//
// }
System.out.println("Already accept Client ......");
//IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
SecChatListener cl = new SecChatListener(pg,sender);
bipipe.setMessageListener(cl);
pgres.getOutputPipes().put(sender, bipipe);
pgres.getInputListener().put(sender, cl);
}catch(Exception e){
e.printStackTrace();
}
*/
ChatBidiJob job = new ChatBidiJob(pa,sender);
job.schedule();
}
return pa;
}
public static boolean sendAdvertisement(OutputPipe output,Advertisement adv,
String sender,String requester){
Message msg = new Message();
try{
MessageUtil.addStringToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
PUB_NAME_STR, sender);//发送者
MessageUtil.addStringToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
REQ_COMM_STR, requester);//要求接收者
MessageUtil.addIntegerToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
RES_CMD_STR, CHAT_ADV_CMD);//要求接收者
// ICFSeriAdvertisement iadv = new CFPipeAdv();
// iadv.setTargetAdv(adv);
//dddddddddddd
MessageUtil.addObjectToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
RES_COMM_STR, adv.getDocument(MimeMediaType.XMLUTF8).toString());//广告
output.send(msg);
}catch(Exception e){
log.error(e.getMessage());
e.printStackTrace();
return false;
}
return true;
}
public PeerGroup getPeerGroup() {
return pg;
}
public IMsgScreen getScreen() {
return screen;
}
public void setPeerGroup(PeerGroup pg) {
this.pg = pg;
}
public void setScreen(IMsgScreen scr) {
screen = scr;
}
class ChatBidiJob extends Job{
PipeAdvertisement pipea = null;
JxtaServerPipe biSrvp;
JxtaBiDiPipe bipipe = null;
String chater;
SecChatListener cl;
IPGResource pgres;
boolean bl = false;
int count = 0;
//TODO 如果超出指定等待时间 将中止线程
public ChatBidiJob(PipeAdvertisement pa,String sender){
super("bidi");
pipea = pa;
chater = sender;
try{
biSrvp = new JxtaServerPipe(pg, pipea);
biSrvp.setPipeTimeout(0);
cl = new SecChatListener(pg,chater);
pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
}catch(Exception e){
log.error(e.getMessage());
e.printStackTrace();
}
}
@Override
protected IStatus run(IProgressMonitor monitor) {
System.out.println(" the wait connect job running....");
try{
bipipe = biSrvp.accept();
new CFChatListener(pg,chater,bipipe);
pgres.getOutputPipes().put(chater, bipipe);
pgres.getInputListener().put(chater, cl);
}catch(Exception e){
e.printStackTrace();
}finally{
this.schedule(1000);
}
/*
if(!bl){
try{
bipipe = biSrvp.accept();
bl = true;
}catch(Exception e ){
e.printStackTrace();
}
}
if(bipipe==null){
this.schedule(1000);
return null;
}
if( bipipe.isBound()){
try{
System.out.println("Already accept Client ......");
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
bipipe.setMessageListener(cl);
System.out.println("the PIPD SDV:\n"+bipipe.getPipeAdvertisement().getDocument(MimeMediaType.XMLUTF8).toString());
//bipipe.setListener(cl);
//bipipe.setListener(new CFPipeEventListener());
pgres.getOutputPipes().put(chater, bipipe);
pgres.getInputListener().put(chater, cl);
}catch(Exception e){
e.printStackTrace();
}
}*/
/*
try{
bipipe = biSrvp.accept();
Message message = bipipe.getMessage(1000*10);
System.out.println(bipipe.isBound()+" the message :"+message);
String mes = null;
if(message!=null){
mes = (String)MessageUtil.getStringFromMessage(message,SecChatListener.CHAT_COMM_SPACE,
SecChatListener.CHAT_COMM_MSG );//CHAT_COMM_SPACE, CHAT_COMM_MSG);
}
System.out.println(" String "+mes);
if(mes !=null ){
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
HashMap map = pgres.getMsgQueues();
Object mobj = map.get(chater);
Queue queue;
System.out.println(chater+" sayed: "+mes);
if(mobj==null){
queue = new ConcurrentLinkedQueue();
map.put(chater, queue);
}else
queue = (Queue)mobj;
queue.add(mes);
}
}catch(Exception e){
e.printStackTrace();
}finally{
this.schedule(1000);
}
*/
return null;
}
public String getChater() {
return chater;
}
public void setChater(String chater) {
this.chater = chater;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -