⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtamulticastsocketservice.java

📁 可以实现P2P聊天通信
💻 JAVA
字号:
package jxtamessenger.service;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.SocketException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

import jxtamessenger.ChatWindow;
import jxtamessenger.MainApplicationWindow;
import jxtamessenger.bean.OfflineMessage;
import jxtamessenger.bean.OnlineMessage;
import jxtamessenger.util.MiscUtil;
import jxtamessenger.util.PipeUtil;
import jxtamessenger.util.ThreadPoolUtil;
import jxtamessenger.xml.XmlCreator;
import jxtamessenger.xml.XmlParser;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.socket.JxtaMulticastSocket;

import org.apache.commons.lang.ClassUtils;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.swt.widgets.Display;

public class JxtaMulticastSocketService implements Service {
	private static final Logger LOG = Logger.getLogger(JxtaMulticastSocketService.class.getName());
	
	private TableViewer viewer;
	
	private JxtaMulticastSocket mcastSocket;
	private final ExecutorService pool;
	private final static String SOCKETIDSTR = "urn:jxta:uuid-59616261646162614E5047205032503386E7C7AE38954620A595F809548D680304";
	
	private static final int TIMEOUT = 0;
	private static final int BUFFERSIZE = 16384;
	
	public JxtaMulticastSocketService(PeerGroup pg, TableViewer viewer) {
		try {
			this.mcastSocket = new JxtaMulticastSocket(pg, getSocketAdvertisement(pg));
			this.viewer = viewer;
		} catch (IOException e) {
			LOG.severe("JxtaMulticastSocket initialize failed!");
			e.printStackTrace();
			System.exit(-1);
		}
		
        if (this.mcastSocket != null) {
            try {
                this.mcastSocket.setSoTimeout(TIMEOUT);
            } catch (SocketException se) {
                se.printStackTrace(System.out);
            }
        }
        
		pool = Executors.newCachedThreadPool();
	}
	
	public static PipeAdvertisement getSocketAdvertisement(PeerGroup pg) {
		return PipeUtil.getPipeAdvWithoutRemoteDiscovery(pg,
				"JxtaMulticastSocket", PipeService.PropagateType, SOCKETIDSTR, true);
	}
	
	public void shutdownAndAwaitTermination() {
		ThreadPoolUtil.shutdownAndAwaitTermination(pool);

		if (this.mcastSocket != null) {
			this.mcastSocket.close();
		}
	}
	
	public void run() {
		try {
			byte[] buffer = new byte[BUFFERSIZE];
			for (;;) {
				Arrays.fill(buffer,(byte)0);
				DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
				
				mcastSocket.receive(packet);
				pool.execute(new Handler(packet));
			}
		} catch(IOException e) {
//			e.printStackTrace();
			pool.shutdown();
		} catch (Exception e) {
			// ignore
			// RejectedExecutionException(线程池关闭shutdown/saturated, 抛出java.util.concurrent.RejectedExecutionException)
//			 e.printStackTrace();
		}
	}

	class Handler implements Runnable {
		private final DatagramPacket packet;
		private final ReentrantLock lock = new ReentrantLock();

		Handler(DatagramPacket packet) {
			this.packet = packet;
		}

		public void run() {
			String sw = new String(packet.getData(), 0, packet.getLength());
			LOG.info("sw=" + sw);
			Object obj = XmlParser.getObject(sw);

            if(ClassUtils.isAssignable(obj.getClass(), OnlineMessage.class)) {
            	final OnlineMessage msg = (OnlineMessage)obj;
            	// 如果列表中已经存在该消息发送者的数据,那么说明已经发送过,不需要将自己的信息发送回去
            	if(viewer.getData(msg.getHostName()) == null) {
                	// TODO: 加入配置项,在列表中是否显示自己
                    Display.getDefault().asyncExec(new Runnable() {
    					@SuppressWarnings("unchecked")
						public void run(){
                    		lock.lock();
                    		try {
        						if(MainApplicationWindow.chatwin.containsKey(msg.getHostName())) {
        							ChatWindow chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(msg.getHostName());
        							if(chatWindow != null)
        								chatWindow.enableInputAndSend();
            					}
        						
                        		if(viewer.getData(msg.getHostName()) == null) {
                            		viewer.add(msg);
                            		viewer.setData(msg.getHostName(), msg);
                            		((List)viewer.getInput()).add(msg);
                        		}
                    		} finally {
                    			lock.unlock();
                    		}
                    	}
                    });
                        
                    // 将自己的信息返回给源节点
                    if(!msg.getHostName().equals(MiscUtil.getHostName())) {
    					try {
    	                	String msgres = XmlCreator.createOnlineMessage();
    						DatagramPacket res = new DatagramPacket(msgres.getBytes(), msgres.length());
    						res.setAddress(res.getAddress());
    						mcastSocket.send(res);
    					} catch (IOException e) {
    						LOG.warning("Seng back OnlineMsg failed.");
    						e.printStackTrace();
    					}
                    }
            	}
            } else if(ClassUtils.isAssignable(obj.getClass(), OfflineMessage.class)) {
            	final OfflineMessage msg = (OfflineMessage)obj;

            	if(!msg.getHostName().equals(MiscUtil.getHostName())) {
                    Display.getDefault().asyncExec(new Runnable() {
                    	@SuppressWarnings("unchecked")
    					public void run(){
                    		lock.lock();
                    		try {
                    			LOG.info(msg.getHostName());
        						if(MainApplicationWindow.chatwin.containsKey(msg.getHostName())) {
    							ChatWindow chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(msg.getHostName());
    							if(chatWindow != null)
    								chatWindow.disableInputAndSend(msg.getUserName());
        						}
                    			
                    			Object o = viewer.getData(msg.getHostName());
                    			
                        		if(o != null) {
                        			viewer.setData(msg.getHostName(), null);
                            		viewer.remove(o);
                            		((List)viewer.getInput()).remove(o);
                        		}
                    		} finally {
                    			lock.unlock();
                    		}
                    	}
                    });
            	}
            } else {
            	// other message ignore
            }
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -