📄 groupbasecommlistener.java
字号:
package com.cn.darkblue.listener;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.jobs.Job;
import com.cn.darkblue.entity.CFPipeAdvertisement;
import com.cn.darkblue.helper.GroupHelper;
import com.cn.darkblue.helper.IMsgScreen;
import com.cn.darkblue.helper.IPGResource;
import com.cn.darkblue.util.CFDefData;
import com.cn.darkblue.util.CFResource;
import com.cn.darkblue.util.CFUtil;
import com.cn.darkblue.util.MessageUtil;
import com.cn.darkblue.util.SysUtil;
import net.jxta.document.Advertisement;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.MimeMediaType;
import net.jxta.document.StructuredDocumentFactory;
import net.jxta.document.StructuredTextDocument;
import net.jxta.document.TextElement;
import net.jxta.endpoint.Message;
import net.jxta.id.IDFactory;
import net.jxta.impl.id.UUID.PipeID;
import net.jxta.peer.PeerID;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.util.JxtaBiDiPipe;
import net.jxta.util.JxtaServerPipe;
public class GroupBaseCommListener implements ICFChatListener {
private static final Log log = LogFactory.getLog(GroupBaseCommListener.class);
public static final String BASE_COMM_NAMESPACE = "Base_Comm";
public static final String PUB_NAME_STR = "Peer_publish_name";
public static final String REQ_COMM_STR = "request_Comm";
public static final String RES_COMM_STR = "Response_Comm";
public static final String RES_CMD_STR = "Cmd_Comm";
public static final String MD5_COMM_STR = "MD5_Authentication";
public static final String CHAT_STR = "Chat_Message";
public static final String BASE_COMM_KEY = "BaseCommKey";
public static final int CHAT_CMD = 0;//请求聊天命令码
public static final int CHAT_ADV_CMD = 99;//广告传递命令码
public static final int CHAT_PGALL_CMD = -1;//群组信息命令码
int chat_state = 0;//0 表示接收并提醒 ,1 接收但不提醒 ,2 不接收
String name = "";//GroupId--peerId
PeerGroup pg = null;
IMsgScreen screen = null;
//String ctName = "";
public GroupBaseCommListener(PeerGroup pg,IMsgScreen srn){
this.pg = pg;
screen = srn;
name = pg.getPeerGroupID().toString()+CFDefData.COMBO_SPSTR+pg.getPeerID().toString();
}
public void pipeMsgEvent(PipeMsgEvent event) {
log.info("已经接收到消息,来自于:"+event.getSource().toString());
log.info("管道ID:"+event.getPipeID());
Message message = event.getMessage();
if (message == null) {
return;
}
System.out.println("message = "+message);
String sender = MessageUtil.getStringFromMessage(message,BASE_COMM_NAMESPACE, PUB_NAME_STR);//发送者
String requester = MessageUtil.getStringFromMessage(message,BASE_COMM_NAMESPACE, REQ_COMM_STR);//被请求者
//String response = MessageUtil.getStringFromMessage(message,BASE_COMM_NAMESPACE, RES_COMM_STR);//回馈信息
int cmd = MessageUtil.getIntegerFromMessage(message,BASE_COMM_NAMESPACE, RES_CMD_STR);//命令信息 值,1,2,3,4。。。
//MessageElement md5 = message.getMessageElement(BASE_COMM_NAMESPACE, MD5_COMM_STR);//加密信息
System.out.println("name="+name);
System.out.println("sender="+sender);
System.out.println("requester="+requester);
System.out.println("cmd="+cmd);
String chat = MessageUtil.getStringFromMessage(message,BASE_COMM_NAMESPACE, CHAT_STR);
if(!name.equals(sender)){//丢弃自己发送的
if(requester!=null){
if(name.equals(requester)){//属于自己的请求
switch(cmd){
case CHAT_CMD://应答请求 创建BIDE通讯管道Server 端
PipeAdvertisement adv = createChatBidiPipe(sender);
if(adv!=null){
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
OutputPipe output = (OutputPipe)pgres.getOutputPipes().get(GroupHelper.PIPE_OUT_KEY);
//TODO if(output==null)如果没有将新建output
System.out.println("oooooooooooo isnull = "+(output==null)+" "+output);
sendAdvertisement(output,adv,name,sender);
}
break;
case CHAT_ADV_CMD://取得回馈 创建BIDE通讯管道连接端
PipeAdvertisement advr = null;
try{
//Object obj = MessageUtil.getObjectFromMessage(message, BASE_COMM_NAMESPACE, RES_COMM_STR);
System.out.println("VVVVVVVVVVVVVVVVVVVVVVVVVVVVVV ");
InputStream sadr = MessageUtil.getInputStreamFromMessage(message, BASE_COMM_NAMESPACE, RES_COMM_STR);
try{
if(sadr!=null){
advr = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8,sadr);
/*
StructuredTextDocument doc = (StructuredTextDocument)
StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, sadr);
advr = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
Enumeration elements = doc.getChildren();
while (elements.hasMoreElements()) {
TextElement elem = (TextElement) elements.nextElement();
if("Id".equalsIgnoreCase(elem.getName())){
System.out.println(elem.getTextValue());
PipeID pipeID = null;
try {
pipeID = (PipeID) IDFactory.fromURI(new URI(elem.getTextValue()));
advr.setPipeID(pipeID);
} catch (URISyntaxException use) {
use.printStackTrace();
}
}
if("Name".equalsIgnoreCase(elem.getName())){
advr.setName(elem.getTextValue());
System.out.println(elem.getTextValue());
}
if("Type".equalsIgnoreCase(elem.getName())){
advr.setType(elem.getTextValue());
System.out.println(elem.getTextValue());
}
System.out.println("name= "+elem.getName());
System.out.println("key= "+elem.getKey());
}*/
}
}catch(Exception e){
e.printStackTrace();
}
System.out.println("RRRRRRRRRRRRRR = "+advr.getDocument(MimeMediaType.XMLUTF8));
}catch(Exception e){
log.error(e.getMessage());
e.printStackTrace();
}
if(advr!=null){
// createIn(advr,sender);
// createOut(advr,sender);
conBidiPipe(advr,sender);
}
break;
}
}
}
}
if(chat!=null && !"".equals(chat)){
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
HashMap map = pgres.getMsgQueues();
Queue queue = (Queue)map.get(pg);
if(queue == null) {
queue = new ConcurrentLinkedQueue();
map.put(pg, queue);
}
queue.add(chat);
}
// if(chat!=null && !"".equals(chat))
// if(screen!=null)
// screen.println(chat);
}
/**
* 尚未使用
* */
private void createIn(PipeAdvertisement advr,String sender){
PipeService pipsrv = pg.getPipeService();
SecChatListener cl = new SecChatListener(pg,sender);
try{
InputPipe input = pipsrv.createInputPipe(advr, cl);//
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
pgres.getInputPipes().put(sender, input);
pgres.getInputListener().put(sender, cl);
}catch(Exception e){
log.error(e.getMessage());
}
}
/**
* 尚未使用
* */
private void createOut(PipeAdvertisement advr,String sender){
PipeService pipsrv = pg.getPipeService();
try{
OutputPipe output = pipsrv.createOutputPipe(advr, 1000);//
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
pgres.getOutputPipes().put(sender, output);
}catch(Exception e){
log.error(e.getMessage());
}
}
private void conBidiPipe(PipeAdvertisement advr,String chater){
try{
//pipe.setReliable(true);
SecChatListener cl = new SecChatListener(pg,chater);
JxtaBiDiPipe pipe;
pipe = new JxtaBiDiPipe();
// pipe.setReliable(true);
// pipe.setMessageListener(cl);
// pipe.connect(pg, advr);
pipe.connect(pg, (PeerID) IDFactory.fromURI(new URI(SysUtil.getComboDescription(chater))), advr, 1000*60, cl, false);
System.out.println("the PIPD SDV:\n"+pipe.getPipeAdvertisement().getDocument(MimeMediaType.XMLUTF8).toString());
System.out.println("Please wait ...........");
pipe.setPipeEventListener(new CFPipeEventListener());
IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
pgres.getOutputPipes().put(chater, pipe);
pgres.getInputListener().put(chater, cl);
/*
Object obj = pgres.getSendQueues().get(chater);
if(obj!=null){
Queue que = (Queue)obj;
Object sendm = que.remove();
while(sendm!=null){
Message message;
if(sendm instanceof Message)
message = (Message)sendm;
else{
message = new Message();
MessageUtil.addStringToMessage(message, SecChatListener.CHAT_COMM_SPACE,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -