📄 filereceiver.java
字号:
/** Copyright (c) 2001 Sun Microsystems, Inc. All rights* reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions* are met:** 1. Redistributions of source code must retain the above copyright* notice, this list of conditions and the following disclaimer.** 2. Redistributions in binary form must reproduce the above copyright* notice, this list of conditions and the following disclaimer in* the documentation and/or other materials provided with the* distribution.** 3. The end-user documentation included with the redistribution,* if any, must include the following acknowledgment:* "This product includes software developed by the* Sun Microsystems, Inc. for Project JXTA."* Alternately, this acknowledgment may appear in the software itself,* if and wherever such third-party acknowledgments normally appear.** 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"* must not be used to endorse or promote products derived from this* software without prior written permission. For written* permission, please contact Project JXTA at http://www.jxta.org.** 5. Products derived from this software may not be called "JXTA",* nor may "JXTA" appear in their name, without prior written* permission of Sun.** THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF* SUCH DAMAGE.* ====================================================================** This software consists of voluntary contributions made by many* individuals on behalf of Project JXTA. For more* information on Project JXTA, please see* <http://www.jxta.org/>.***/package net.jxta.myjxta.plugins.filetransfer;import net.jxta.credential.Credential;import net.jxta.document.AdvertisementFactory;import net.jxta.exception.PeerGroupException;import net.jxta.id.IDFactory;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.platform.NetworkManager;import net.jxta.protocol.PipeAdvertisement;import net.jxta.socket.JxtaServerSocket;import net.jxta.socket.JxtaSocket;import java.io.*;import java.net.URI;import java.security.MessageDigest;import java.util.logging.Logger;/** * Receives a file which is send by the FileSender class. * The transfer is done via JxtaSocket connections * * @version $Id: FileReceiver.java,v 1.5 2007/05/27 16:02:03 nano Exp $ */public class FileReceiver extends Thread { private transient PeerGroup netPeerGroup = null; public final static String SOCKETIDSTR = "urn:jxta:uuid-59616261646162614E5047205032503393B5C2F6CA7A41FBB0F890173088E79404"; private final PipeID m_pipeID; private OutputStream m_outputStream; private boolean m_stopReceiver = false; private JxtaServerSocket m_serverSocket; private long m_thoughputKByte; public enum OVERRIDE_BEHAVIOR { OVERRIDE, //override the existing file, transfer starts at offset 0 RESUME, //resume transfer (transfer only the missing part of the file and append them) APPEND //append received data (transfer starts at offset 0, but the received data is append) } /** * @param pg The peergroup inside which the transfer is handled * @param pipeID - the pipe id which is beeing used for the JxtaServerSocket - * this must be the same pipe-id that is specified at the sender side * @throws IOException * @throws PeerGroupException */ public FileReceiver(PeerGroup pg, PipeID pipeID) { m_pipeID = pipeID; netPeerGroup = pg; m_outputStream = null; } //creates a Unicast Pipe Advertisment with the given pipeID public static PipeAdvertisement createSocketAdvertisement(PipeID pipeID) { PipeID socketID; socketID = pipeID; PipeAdvertisement advertisement = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); advertisement.setPipeID(socketID); advertisement.setType(PipeService.UnicastType); advertisement.setName("FileTransferSocket"); return advertisement; } /** * shutdown the file-receiver, ongoing transports will be canceled */ public void shutdownListener() { m_stopReceiver = true; if (m_serverSocket != null) try { m_serverSocket.close(); } catch (IOException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } /** * Start the file receiver - an incoming JxtaServer Socket will be created. * This method blocks until shutdownListener is beeing called. */ public void run() { System.out.println("Starting ServerSocket"); m_serverSocket = null; try { PipeAdvertisement pipeAdvertisement = createSocketAdvertisement(m_pipeID); netPeerGroup.getDiscoveryService().publish(pipeAdvertisement); m_serverSocket = new JxtaServerSocket(netPeerGroup, pipeAdvertisement, 10); m_serverSocket.setSoTimeout(0); } catch (IOException e) { System.out.println("failed to create a server socket"); e.printStackTrace(); System.exit(-1); } while (!m_stopReceiver) { try { System.out.println("Waiting for connections"); JxtaSocket socket = (JxtaSocket) m_serverSocket.accept(); if (socket != null) { System.out.println("New socket connection accepted"); Thread thread = new Thread(new ConnectionHandler(socket), "Connection Handler Thread"); thread.start(); } } catch (Exception e) { if (!m_stopReceiver) e.printStackTrace(); } } System.out.println("File Transfer Socket Closed"); } /** * handles one incoming file transfer. */ private class ConnectionHandler implements Runnable { JxtaSocket socket = null; ConnectionHandler(JxtaSocket socket) { this.socket = socket; } /** * Sends data over socket * * @param socket the socket */ private void sendAndReceiveData(JxtaSocket socket) { try { long start = System.currentTimeMillis(); // get the socket output stream DataOutputStream out = new DataOutputStream(socket.getOutputStream()); // get the socket input stream InputStream in = socket.getInputStream(); DataInput dis = new DataInputStream(in); //first read the fileName from the stream (sender will tell us whats the prefered filename) String fileName = dis.readUTF(); //next read the fileSize (this value is only beeing used to inform the user what size he has to expect) long fileSize = dis.readLong(); Credential credential = socket.getCredentialDoc(); boolean append = false; File file; long wantedOffset; String sender = (credential != null ? credential.getPeerID().toString() : "unknown"); do { file = getTargetFile(fileName, fileSize, sender); if (file == null) { out.writeLong(-1); //request offset -1 --> transfer aborted out.flush(); socket.close(); return; } if (file.exists()) { switch (fileExistsHandling(file)) { case RESUME: wantedOffset = file.length(); append = true; System.out.println("resuming file:" + file.getName()); break; case APPEND: wantedOffset = 0; append = true; System.out.println("appending to file:" + file.getName()); break; case OVERRIDE: wantedOffset = 0; append = false; System.out.println("overriding file:" + file.getName()); break; default: wantedOffset = -2; //unknown append type } } else { //new file wantedOffset = 0; } } while (wantedOffset == -2); out.writeLong(wantedOffset); //request a specific offset from the sender out.flush(); m_outputStream = new FileOutputStream(file, append); MessageDigest md5 = MessageDigest.getInstance("MD5"); long receivedOffset = dis.readLong(); assert (receivedOffset == 0); // we have requestes 0 int size = dis.readInt(); long total = 0; while (size != 0) { byte[] buf = new byte[size]; md5.reset(); System.out.println("waiting for offset:" + receivedOffset); dis.readFully(buf); md5.update(buf); byte[] checkSum = md5.digest(); System.out.println("sending md5:" + wantedOffset); out.write(checkSum); out.flush(); wantedOffset = receivedOffset + size; System.out.println("sending next offset request:" + wantedOffset); out.writeLong(wantedOffset); out.flush(); log("waiting for next offset:" + wantedOffset); long nextOffset = dis.readLong(); log("got offset"); int nextSize = dis.readInt(); log("got size"); if (nextOffset == wantedOffset) { // it seems the our outgoing checksum was correct (the sender sends what we have requested, not a retransmit) log("received and verified offset and size"); m_outputStream.write(buf); //write out the old buffer receivedOffset = nextOffset; size = nextSize; total = total + buf.length; } else { //offset wrong? log("checksum wrong, re-receiving offset " + nextOffset); } } out.close(); in.close(); m_outputStream.flush(); m_outputStream.close(); long finish = System.currentTimeMillis(); long elapsed = finish - start; m_thoughputKByte = (total / elapsed) * 1000 / 1024; info("transfer of" + file.getName() + "complete"); socket.close(); //log("Connection closed"); } catch (Exception ie) { log(ie.getMessage()); ie.printStackTrace(); } } public void run() { sendAndReceiveData(socket); } } protected void info(String message) { FileReceiver.log(message); } private static final Logger LOG = Logger.getLogger(FileReceiver.class.getName()); private static void log(String s) { LOG.fine(s); } public long getThoughputKByte() { return m_thoughputKByte; } protected OVERRIDE_BEHAVIOR fileExistsHandling(File file) { return OVERRIDE_BEHAVIOR.RESUME; } /** * Ask the user where to store the file * * @param fileName * @param size * @param sender * @return the target position or null if the user has rejected the transfer */ protected File getTargetFile(String fileName, long size, String sender) { //obviously this is only a sample implementatation... File incomingDir = new File("./incoming"); if (!incomingDir.exists()) incomingDir.mkdir(); return new File(incomingDir, fileName); } /** * main * * @param args command line args */ public static void main(String args[]) { System.setProperty("net.jxta.logging.Logging", "FINEST"); System.setProperty("net.jxta.level", "FINEST"); System.setProperty("java.util.logging.config.file", "logging.properties"); try { Thread.currentThread().setName(FileReceiver.class.getName() + ".main()"); NetworkManager manager = new NetworkManager(NetworkManager.ConfigMode.EDGE, "FileReceiver", new File(new File(".cache"), "FileReceiver").toURI()); manager.setUseDefaultSeeds(true); manager.startNetwork(); log("Waiting for Rendezvous..."); if (!manager.waitForRendezvousConnection(40000)) { log("failed to connect to Rendezvous"); } String SOCKETIDSTR = "urn:jxta:uuid-59616261646162614E5047205032503393B5C2F6CA7A41FBB0F890173088E79404"; PipeID socketID = (PipeID) IDFactory.fromURI(new URI(SOCKETIDSTR)); FileReceiver socEx = new FileReceiver(manager.getNetPeerGroup(), socketID); socEx.run(); } catch (Throwable e) { System.err.println("Failed : " + e); e.printStackTrace(System.err); System.exit(-1); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -