⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 groupbasecommlistener.java

📁 CoolFace是基于jxta的P2P系统
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                				SecChatListener.CHAT_COMM_MSG, sendm.toString());
                	}
                	pipe.sendMessage(message);
                	sendm = que.remove();
            	}
            }
            */
		}catch(Exception ex){
			ex.printStackTrace();
		}
		
	}

	//尚未使用
	private PipeAdvertisement createChatPipe(String sender){
		IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
		InputPipe input = null;
		PipeAdvertisement pa = null;
		try {
			Object obj = pgres.getInputPipes().get(sender);
			if(obj!=null){//曾经有过 直接取出
				input = (InputPipe)obj;
			}else{
				//
				pa = CFUtil.crearePipeAdvertisement(pg.getPeerGroupID(), 
						PipeService.UnicastType, "ChatPipe");
				PipeService pipsrv = pg.getPipeService();
				SecChatListener cl = new SecChatListener(pg,sender);
				input = pipsrv.createInputPipe(pa, cl);//
	            pgres.getInputPipes().put(sender, input);
	            pgres.getInputListener().put(sender, cl);
			}
        } catch (IOException ex) {
            log.error(ex.getMessage());
            return null;
        }
        
        if(input==null){
        	log.error("无法为 "+sender+" 创建联系通道");
        	return null;
        }
        
        System.out.println("CCCCCCCCCCCCCCCCC = "+pa.getDocument(MimeMediaType.XMLUTF8));
        return pa;
	}
	private PipeAdvertisement createChatBidiPipe(String sender){
		IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
		JxtaBiDiPipe input = null;
		PipeAdvertisement pa = null;
		Object obj = pgres.getOutputPipes().get(sender);//pgres.getInputPipes().get(sender);Bidi发送和接收都是同一个,统一放在Output里
		if(obj!=null){//曾经有过 直接取出
			input = (JxtaBiDiPipe)obj;
		}else{
			//
			pa = CFUtil.crearePipeAdvertisement(CFResource.getInstance().getWorldGroup().getPeerGroupID(),//pg.getPeerGroupID(), 
					PipeService.UnicastType, "ChatPipe");

			log.info(pa.getDocument(MimeMediaType.XMLUTF8));
			System.out.println("the pipe adv = "+pa.getDocument(MimeMediaType.XMLUTF8));
			//CFUtil.publishObject(pg.getDiscoveryService(), pa);
			/*
			try{
				JxtaServerPipe biSrvp = new JxtaServerPipe(pg, pa);
				biSrvp.setPipeTimeout(0);
				System.out.println("Accept Client Connecting......");
				JxtaBiDiPipe bipipe = biSrvp.accept();
//				while(bipipe==null){
//					
//				}
				System.out.println("Already  accept Client ......");
				//IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
				SecChatListener cl = new SecChatListener(pg,sender);
				bipipe.setMessageListener(cl);
				pgres.getOutputPipes().put(sender, bipipe);
	            pgres.getInputListener().put(sender, cl);
	            
			}catch(Exception e){
				e.printStackTrace();
			}
			*/
			
			ChatBidiJob job = new ChatBidiJob(pa,sender);
			job.schedule();
		}
		
        return pa;
	}
	
	public static boolean sendAdvertisement(OutputPipe output,Advertisement adv,
			String sender,String requester){
		Message msg = new Message();
		
		try{
			MessageUtil.addStringToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
					PUB_NAME_STR, sender);//发送者
			MessageUtil.addStringToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
					REQ_COMM_STR, requester);//要求接收者
			MessageUtil.addIntegerToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE,
					RES_CMD_STR, CHAT_ADV_CMD);//要求接收者
			
//			ICFSeriAdvertisement iadv = new CFPipeAdv();
//			iadv.setTargetAdv(adv);
			
			//dddddddddddd
			
			MessageUtil.addObjectToMessage(msg, GroupBaseCommListener.BASE_COMM_NAMESPACE, 
					RES_COMM_STR, adv.getDocument(MimeMediaType.XMLUTF8).toString());//广告
			output.send(msg);
		}catch(Exception e){
			log.error(e.getMessage());
			e.printStackTrace();
			return false;
		}
		
		return true;
	}

	
	public PeerGroup getPeerGroup() {
		return pg;
	}

	public IMsgScreen getScreen() {
		return screen;
	}

	public void setPeerGroup(PeerGroup pg) {
		this.pg = pg;
	}

	public void setScreen(IMsgScreen scr) {
		screen = scr;
	}
	
	class ChatBidiJob extends Job{

		PipeAdvertisement pipea = null;
		JxtaServerPipe biSrvp;
		JxtaBiDiPipe bipipe = null;
		String chater;
		SecChatListener cl;
		IPGResource pgres;
		boolean bl = false;

		int count = 0;
		//TODO 如果超出指定等待时间 将中止线程
		public ChatBidiJob(PipeAdvertisement pa,String sender){
			super("bidi");
			pipea = pa;
			chater = sender;
			try{
				
				biSrvp = new JxtaServerPipe(pg, pipea);
			
				biSrvp.setPipeTimeout(0);
				cl = new SecChatListener(pg,chater);
				pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
				
			}catch(Exception e){
				log.error(e.getMessage());
				e.printStackTrace();
			}
		}
		@Override
		protected IStatus run(IProgressMonitor monitor) {
			System.out.println(" the wait connect job running....");
			try{
				bipipe = biSrvp.accept();
				new CFChatListener(pg,chater,bipipe);
				
				pgres.getOutputPipes().put(chater, bipipe);
	            pgres.getInputListener().put(chater, cl);
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				this.schedule(1000);
			}
			/*
			if(!bl){
				try{
					bipipe = biSrvp.accept();
					bl = true;
				}catch(Exception e ){
					e.printStackTrace();
				}
			}
				
			
			if(bipipe==null){
				this.schedule(1000);
				return null;
			}
			if( bipipe.isBound()){
				try{
					System.out.println("Already  accept Client ......");
					IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
					
					bipipe.setMessageListener(cl);
					
					System.out.println("the PIPD SDV:\n"+bipipe.getPipeAdvertisement().getDocument(MimeMediaType.XMLUTF8).toString());

					
					//bipipe.setListener(cl);
					//bipipe.setListener(new CFPipeEventListener());
					pgres.getOutputPipes().put(chater, bipipe);
		            pgres.getInputListener().put(chater, cl);
				}catch(Exception e){
					e.printStackTrace();
				}
			}*/
			/*
			try{

				bipipe = biSrvp.accept();
				Message message = bipipe.getMessage(1000*10);
				System.out.println(bipipe.isBound()+" the message :"+message);
				
				String mes = null;
				if(message!=null){
					mes = (String)MessageUtil.getStringFromMessage(message,SecChatListener.CHAT_COMM_SPACE, 
						SecChatListener.CHAT_COMM_MSG );//CHAT_COMM_SPACE, CHAT_COMM_MSG);
				}
				
				System.out.println(" String "+mes);
				if(mes !=null ){
		        	IPGResource pgres = (IPGResource)CFResource.getInstance().getJoinPGMap().get(pg);
		        	HashMap map = pgres.getMsgQueues();
		        	Object mobj = map.get(chater);
		        	Queue queue;
		        	System.out.println(chater+" sayed: "+mes);
		        	if(mobj==null){
		        		queue = new ConcurrentLinkedQueue();
		        		map.put(chater, queue);
		        	}else
		        		queue = (Queue)mobj;
		        	
		        	queue.add(mes);
		        }
			}catch(Exception e){
				e.printStackTrace();
				
			}finally{
				this.schedule(1000);
			}
			*/
			return null;
		}
		public String getChater() {
			return chater;
		}
		public void setChater(String chater) {
			this.chater = chater;
		}

		
	}

}




⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -