📄 filesender.java
字号:
/** Copyright (c) 2001 Sun Microsystems, Inc. All rights* reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions* are met:** 1. Redistributions of source code must retain the above copyright* notice, this list of conditions and the following disclaimer.** 2. Redistributions in binary form must reproduce the above copyright* notice, this list of conditions and the following disclaimer in* the documentation and/or other materials provided with the* distribution.** 3. The end-user documentation included with the redistribution,* if any, must include the following acknowledgment:* "This product includes software developed by the* Sun Microsystems, Inc. for Project JXTA."* Alternately, this acknowledgment may appear in the software itself,* if and wherever such third-party acknowledgments normally appear.** 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"* must not be used to endorse or promote products derived from this* software without prior written permission. For written* permission, please contact Project JXTA at http://www.jxta.org.** 5. Products derived from this software may not be called "JXTA",* nor may "JXTA" appear in their name, without prior written* permission of Sun.** THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF* SUCH DAMAGE.* ====================================================================** This software consists of voluntary contributions made by many* individuals on behalf of Project JXTA. For more* information on Project JXTA, please see* <http://www.jxta.org/>.***/package net.jxta.myjxta.plugins.filetransfer;import net.jxta.id.IDFactory;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.PipeID;import net.jxta.platform.NetworkManager;import net.jxta.protocol.PipeAdvertisement;import net.jxta.socket.JxtaSocket;import java.beans.PropertyChangeEvent;import java.beans.PropertyChangeListener;import java.beans.PropertyChangeSupport;import java.io.*;import java.net.URI;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;import java.util.Arrays;import java.util.logging.Level;import java.util.logging.Logger;/** * Sends a given file towards a given receiver inside a specific jxta groups * The transfer is done via JxtaSocket connections * * @version $Id: FileSender.java,v 1.5 2007/05/27 16:02:02 nano Exp $ */public final class FileSender extends Thread { private int m_timeout = 5000; public enum TRANSFER_STATUS_ENUM { CONNECTING, WAITING_FOR_RECEIVER, TRANSFERING, FINISHED } public static final String TRANSFER_PROGRESS_PROPERTY = "transferProgress"; public static final String TRANSFER_STATUS_PROPERTY = "transferStatus"; public static final String TRANSFER_THROUGHPUT_KBYTE_PROPERTY = "transferThroughputKByte"; /** * payload size */ private final static int PAYLOADSIZE = 64 * 1024; private final String m_fileName; private final long m_bytesTransfered = 0; private final File m_file; private final long m_estimatedFileSize; private int m_progress = 0; private final PeerGroup m_peergroup; private final PipeAdvertisement m_pipeAdv; private final InputStream m_inputStream; PropertyChangeSupport m_changeSupport = new PropertyChangeSupport(this); public FileSender(PeerGroup peerGroup, PipeID targetPipeID, File dataSource) throws FileNotFoundException { m_peergroup = peerGroup; m_pipeAdv = FileReceiver.createSocketAdvertisement(targetPipeID); m_file = dataSource; m_estimatedFileSize = m_file.length(); m_fileName = m_file.getName(); if (m_file.exists() && m_file.canRead()) { m_inputStream = new FileInputStream(dataSource); } else { m_inputStream = null; } } public void addPropertyChangeListener(PropertyChangeListener listener) { m_changeSupport.addPropertyChangeListener(listener); } public long getTransferedBytes() { return m_bytesTransfered; } public void setTimeout(int m_timeout) { this.m_timeout = m_timeout; } public void run() { try { long start = System.currentTimeMillis(); System.out.println("Connecting to the server"); m_changeSupport.firePropertyChange(TRANSFER_STATUS_PROPERTY, null, TRANSFER_STATUS_ENUM.CONNECTING); JxtaSocket socket = new JxtaSocket(m_peergroup, //no specific peerid null, m_pipeAdv, //connection timeout: 5 seconds m_timeout, // reliable connection true); // get the socket output stream OutputStream out = socket.getOutputStream(); DataOutput dos = new DataOutputStream(out); // get the socket input stream InputStream in = socket.getInputStream(); DataInput dis = new DataInputStream(in);// long bytesSend = ITERATIONS * (long) PAYLOADSIZE * 2;// System.out.println("Sending/Receiving " + bytesSend + " bytes."); m_changeSupport.firePropertyChange(TRANSFER_STATUS_PROPERTY, null, TRANSFER_STATUS_ENUM.WAITING_FOR_RECEIVER); dos.writeUTF(m_fileName); dos.writeLong(m_estimatedFileSize); out.flush(); long requestedOffset = -1; try { requestedOffset = dis.readLong(); } catch (IOException e) { //ignore } long bytesSend = 0; if (requestedOffset != -1) { byte[] out_buf = new byte[PAYLOADSIZE]; byte[] in_buf = new byte[16]; MessageDigest md5 = MessageDigest.getInstance("MD5"); md5.reset(); long offset = requestedOffset; int bytesRead = readFile(0, offset, out_buf); md5.update(out_buf, 0, bytesRead); while (bytesRead != -1) { System.out.println("sending offset: " + offset); dos.writeLong(offset); System.out.println("sending size:" + bytesRead); dos.writeInt(bytesRead); out.flush(); log("sending data"); out.write(out_buf, 0, bytesRead); bytesSend += bytesRead; out.flush(); log("waiting for md5"); dis.readFully(in_buf); log("waiting for next offset request"); requestedOffset = dis.readLong(); bytesSend += in_buf.length; if (Arrays.equals(in_buf, md5.digest())) { log("md5 checksum correct:" + offset); md5.reset(); offset = offset + bytesRead; updateProgress(offset); if (requestedOffset != -1) { bytesRead = readFile(offset, requestedOffset, out_buf); if (bytesRead != -1) { offset = requestedOffset; md5.update(out_buf, 0, bytesRead); } } } else { log("md5 checksum wrong, resending offset " + offset); } } dos.writeLong(offset); //should be the overall filesize dos.writeInt(0); //end of transfer } else { //other side rejected the transfer } out.close(); in.close(); long finish = System.currentTimeMillis(); long elapsed = finish - start; int throughputKBytes = (int) (bytesSend / elapsed) * 1000 / 1024; m_changeSupport.firePropertyChange(FileSender.TRANSFER_THROUGHPUT_KBYTE_PROPERTY, 0, throughputKBytes); socket.close(); log("Socket connection closed"); m_changeSupport.firePropertyChange(TRANSFER_STATUS_PROPERTY, null, TRANSFER_STATUS_ENUM.FINISHED); finish(); } catch (IOException io) { io.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } private static final Logger LOG = Logger.getLogger(FileSender.class.getName()); private static void log(String s) { LOG.fine(s); } private int readFile(long offset, long requestedOffset, byte[] out_buf) throws IOException { if (requestedOffset > offset) { long skipped = m_inputStream.skip(requestedOffset - offset); assert (skipped == requestedOffset - offset); } else if (requestedOffset < offset) return -1; //not possible with input streams return m_inputStream.read(out_buf, 0, PAYLOADSIZE); } private void finish() { m_changeSupport = null; } private void updateProgress(long offset) { int newProgress = Math.round(100 * ((float) offset / (float) m_estimatedFileSize)); if (newProgress != m_progress) { int oldProgress = m_progress; m_progress = newProgress; m_changeSupport.firePropertyChange(TRANSFER_PROGRESS_PROPERTY, oldProgress, m_progress); } } /** * @param args first parameter is the filename which should be send to the other side. */ public static void main(String args[]) { System.setProperty("net.jxta.logging.Logging", Level.INFO.getName()); NetworkManager manager = null; try { manager = new NetworkManager(NetworkManager.ConfigMode.EDGE, "FileSender", new File(new File(".cache"), "FileSender").toURI()); manager.setUseDefaultSeeds(true); manager.startNetwork(); } catch (Exception e) { e.printStackTrace(); System.exit(-1); } try { Thread.currentThread().setName(FileSender.class.getName() + ".main()"); log("Waiting for Rendezvous..."); if (!manager.waitForRendezvousConnection(30 * 1000)) { log("unable to connect to Rendezvous"); } else { log("Rendezvous connection established"); } String SOCKETIDSTR = "urn:jxta:uuid-59616261646162614E5047205032503393B5C2F6CA7A41FBB0F890173088E79404"; PipeID socketID = (PipeID) IDFactory.fromURI(new URI(SOCKETIDSTR)); File sourceFile = new File(args[0]); FileSender socEx = new FileSender(manager.getNetPeerGroup(), socketID, sourceFile); //the bean communicates with us via a property change listener socEx.addPropertyChangeListener(new PropertyChangeListener() { public void propertyChange(PropertyChangeEvent propertyChangeEvent) { System.out.println(propertyChangeEvent.getPropertyName() + ":" + propertyChangeEvent.getNewValue()); } }); socEx.setTimeout(30 * 1000); //start the file-transfer (blocking call) socEx.run(); manager.stopNetwork(); } catch (Throwable e) { System.out.flush(); System.err.println("Failed : " + e); e.printStackTrace(System.err); System.exit(-1); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -