⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mysocket.java

📁 基于jxta的局域网P2P文件共享,可以实现局域网中的文件p2p共享,实现文件快速传输及交流
💻 JAVA
字号:
package connex.core.net;

import net.jxta.socket.JxtaSocket;
import net.jxta.endpoint.Messenger;
import net.jxta.peergroup.PeerGroup;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.protocol.PeerAdvertisement;
import net.jxta.document.StructuredDocument;
import java.io.IOException;
import net.jxta.peer.PeerID;
import net.jxta.pipe.PipeMsgEvent;

import net.jxta.socket.JxtaServerSocket;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.document.AdvertisementFactory;
import java.io.InputStream;
import net.jxta.impl.util.pipe.reliable.Defs;
import org.apache.log4j.Level;
import net.jxta.document.StructuredDocumentFactory;
import java.util.Iterator;
import org.apache.log4j.Logger;
import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;
import net.jxta.impl.util.pipe.reliable.ReliableInputStream;
import net.jxta.endpoint.StringMessageElement;
import net.jxta.document.MimeMediaType;
import net.jxta.pipe.PipeService;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.TextDocumentMessageElement;


/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author unbekannt
 * @version 1.0
 */

public class MySocket extends JxtaSocket {
    private final static Logger LOG = Logger.getLogger(MySocket.class.getName());
    protected PeerAdvertisement peerAdvt;

    protected MySocket(PeerGroup group,
                       Messenger msgr,
                       PipeAdvertisement newpipe,
                       StructuredDocument credDoc,
                       boolean isStream, PeerAdvertisement peerAdv) throws
            IOException {

        super(group, msgr, newpipe, credDoc, isStream);
        setPeerAdv(peerAdv);

    }

    public MySocket(PeerGroup group,
                    PeerID peerid,
                    PipeAdvertisement pipeAd,
                    int timeout,
                    boolean stream) throws IOException {

        super(group, peerid, pipeAd, timeout, stream);
    }

    /**
    *  Create a connection request message
    *
    *@param  group   group context
    *@param  pipeAd  pipe advertisement
    *@return         the Message  object
    */
   protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {

       Message msg = new Message();
       PeerAdvertisement peerAdv = group.getPeerAdvertisement();
       if (myCredentialDoc == null) {
           myCredentialDoc = getCredDoc(group);
       }
       if (myCredentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {
           throw new IOException("No credentials established to initiate a secure connection");
       }
       if (LOG.isEnabledFor(Level.DEBUG)) {
           LOG.debug("Requesting connection [isStream] :"+isStream);
       }
       try {
           if (myCredentialDoc != null) {
               msg.addMessageElement(JxtaServerSocket.nameSpace,
                                     new TextDocumentMessageElement(JxtaServerSocket.credTag,
                                                                    (XMLDocument) myCredentialDoc, null));
           }

           msg.addMessageElement(JxtaServerSocket.nameSpace,
                                 new TextDocumentMessageElement(JxtaServerSocket.reqPipeTag,
                                                                (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));

           msg.addMessageElement(JxtaServerSocket.nameSpace,
                                 new StringMessageElement(JxtaServerSocket.streamTag,
                                                          Boolean.toString(isStream),
                                                          null));
           msg.addMessageElement(JxtaServerSocket.nameSpace,
                                 new TextDocumentMessageElement(JxtaServerSocket.remPeerTag,
                                                                (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));
           return msg;
       } catch (Throwable t) {
           if (LOG.isEnabledFor(Level.DEBUG)) {
               LOG.debug("error getting element stream", t);
           }
           return null;
       }
   }

    protected static Messenger lightweightOutputPipe1(PeerGroup group,
            PipeAdvertisement outputPipeAdv,
            PeerAdvertisement peerAdv) {

        return lightweightOutputPipe(group, outputPipeAdv, peerAdv);
    }


    /**
      * {@inheritDoc}
      */
     public void pipeMsgEvent(PipeMsgEvent event) {

         Message message = event.getMessage();
         if (message == null) {
             return;
         }

         MessageElement element = null;
         if (!bound) {
             // look for a remote pipe answer
             element = (MessageElement)
                       message.getMessageElement(JxtaServerSocket.nameSpace,
                                                 JxtaServerSocket.remPipeTag);

             if (element != null) {
                 // connect response
                 try {
                     PeerAdvertisement peerAdv = null;
                     InputStream in = element.getStream();
                     PipeAdvertisement pa = (PipeAdvertisement)
                                            AdvertisementFactory.newAdvertisement(element.getMimeType(), in);
                     element = message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.remPeerTag);
                     if (element != null) {
                         in = element.getStream();
                         peerAdv = (PeerAdvertisement)
                                   AdvertisementFactory.newAdvertisement(element.getMimeType(), in);
                     } else {
                         return;
                     }

                     element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.credTag);
                     if (element != null) {
                         in = element.getStream();
                         credentialDoc = (StructuredDocument)
                                         StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), in);
                     }

                     element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.streamTag);
                     if (element != null) {
                         isStream = (element.toString().equals("true"));
                     }
                     msgr = lightweightOutputPipe(group, pa, peerAdv);
                     if (msgr == null) {
                         // let the connection attempt timeout
                         if (LOG.isEnabledFor(Level.ERROR)) {
                             LOG.error("Unable to obtain a back messenger");
                         }
                         return;
                     }
                     if (isStream) {
                         // Create the input stream right away, otherwise
                         // the first few messages from remote will be lost, unless
                         // we use an intermediate queue.
                         // FIXME: it would be even better if we could create the
                         // input stream BEFORE having the output pipe resolved, but
                         // that would force us to have the MsrgAdaptor block
                         // until we can give it the real pipe or msgr... later.
                         createRis();
                     }
                     synchronized (finalLock) {
                         waiting = false;
                         finalLock.notifyAll();
                     }
                 } catch (IOException e) {
                     if (LOG.isEnabledFor(Level.ERROR)) {
                         LOG.error("failed to process response message", e);
                     }
                 }
             }
         }
         //net.jxta.impl.util.MessageUtil.printMessageStats(message, true);
         // look for close request
         element = (MessageElement)
                   message.getMessageElement(JxtaServerSocket.nameSpace,
                                             JxtaServerSocket.closeTag);
         if (element != null) {
             if (element.toString().equals("close")) {
                 try {
                     if (LOG.isEnabledFor(Level.DEBUG)) {
                         LOG.debug("Received a close request");
                     }
                     closeFromRemote();
                 } catch (IOException ie) {
                     if (LOG.isEnabledFor(Level.ERROR)) {
                         LOG.error("failed during closeFromRemote", ie);
                     }
                 }
             } else if (element.toString().equals("closeACK")) {
                 if (LOG.isEnabledFor(Level.DEBUG)) {
                     LOG.debug("Received a close acknowledgement");
                 }
                 synchronized(closeLock) {
                     closeLock.notify();
                 }
             }
         }

         if (!isStream) {
             // isthere data ?
             element = (MessageElement)
                       message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.dataTag);
             if (element == null) {
                 return;
             }

             try {
                 queue.push(element, -1);
             } catch (InterruptedException e) {
                 if (LOG.isEnabledFor(Level.DEBUG)) {
                     LOG.debug("Interrupted", e);
                 }
             }
             return;
         }

         Iterator i =
             message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);

         if (i != null && i.hasNext()) {
             if (ros != null) {
                 ros.recv(message);
             }
             return;
         }

         i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);
         if (i != null && i.hasNext()) {

             // It can happen that we receive messages for the input stream
             // while we have not finished creating it.
             try {
                 synchronized (finalLock) {
                     while (waiting) {
                         finalLock.wait(timeout);
                     }
                 }
             } catch (InterruptedException ie) {}

             if (ris != null) {
                 ris.recv(message);
             }
         }
     }

     private void createRis() {

         if (outgoing == null) {
             outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);
         }
         if (ris == null) {
             ris = new ReliableInputStream(outgoing, retryTimeout);
         }
     }





    private void setPeerAdv(PeerAdvertisement peerAdv) {
        this.peerAdvt = peerAdv;
    }


    public PeerAdvertisement getRemotePeerAdv() {
        return this.peerAdvt;
    }

    public String getRemotePeerID() {
        return this.peerAdvt.getPeerID().toString();
    }

    public String getRemotePeerName() {
        return this.peerAdvt.getName();
    }


}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -