📄 getcontentrequest.java
字号:
/* * Copyright (c) 2001 Sun Microsystems, Inc. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * *==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * * $Id: GetContentRequest.java,v 1.23 2006/02/07 20:43:51 bondolo Exp $ * */package net.jxta.share.client;import java.io.File;import java.io.InputStream;import java.io.IOException;import java.io.RandomAccessFile;import java.util.BitSet;import java.util.Hashtable;import java.util.Enumeration;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.Message;import net.jxta.peergroup.PeerGroup;import net.jxta.share.CMS;import net.jxta.share.ContentAdvertisement;import org.apache.log4j.Logger;import org.apache.log4j.Level;/** * This class implements Get Content Request. * It request the given content from the given CMS request pipe. */public class GetContentRequest implements EndpointListener, Runnable { private final static transient Logger LOG = Logger.getLogger(GetContentRequest.class.getName()); private PeerGroup group = null; private EndpointAddress address = null; private ContentAdvertisement cAdv = null; private Hashtable targetTable = new Hashtable(); private File destFile = null; private Thread thread = null; private volatile boolean isDone = false; private boolean hasFailed = false; private long contentLength = 0; private int chunkSize = 0; private int numberChunks = 0; private int numberReceived = 0; private BitSet bitSet = null; private BitSet bitSetReq = null; protected int maxParallelDownload = 10; protected int maxRetry = 3; protected long timeout = 20 * 1000; // 20sec * 3 = 1min protected int chunksAtOneTime = 10; private static int requestCounter = 0; private static Object counterSynch = new Object(); public static int getUnusedRequestID() { synchronized (counterSynch) { return requestCounter++; } } /** * Create a GetContentRequest to download remotely shared content to disk. * The download will start immediately after this constructor is called. * *@param group the peergroup in which the content is being shared *@param cAdv the advertisement for the content *@param inFile the file in which to save the content */ public GetContentRequest( PeerGroup group, ContentAdvertisement cAdv, File inFile) { this(group, new ContentAdvertisement[] { cAdv }, inFile); } /** * Similar to * {@link #GetContentRequest(PeerGroup, ContentAdvertisement, File)} * , except the content is downloaded from multiple sources. *@param cAdvs advertisements from the various sources that the content is * to be downloaded from. */ public GetContentRequest( PeerGroup group, ContentAdvertisement[] cAdvs, File inFile) { if (cAdvs.length < 1) { isDone = true; hasFailed = true; notifyFailure(); return; } this.group = group; this.cAdv = cAdvs[0]; destFile = inFile; // register the message listener String addressStr = null; addressStr = "jxta://" + CMS.getPeerId(group) + "/CMS-GetRequest" + Integer.toString(getUnusedRequestID()) + "/" + CMS.getGroupId(group); if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug( "ContentName = '" + cAdv.getName() + "' reply address = " + addressStr); if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("save content to '" + inFile.getPath() + "'"); address = new EndpointAddress(addressStr); if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug( "register address = " + address.getServiceName() + address.getServiceParameter()); group.getEndpointService().addIncomingMessageListener( this, address.getServiceName(), address.getServiceParameter()); for (int i = 0; i < cAdvs.length; i++) { //use a unique request ID for each of the sources String requestid = address.getServiceName() + ":" + i; //store the status information of each of the parallel downloads // in the targetTable targetTable.put(requestid, new TargetItem(cAdvs[i])); } thread = new Thread(this); thread.start(); } public void run() { while (!isDone) { int counter = 0; Enumeration en = targetTable.keys(); while (en.hasMoreElements()) { String requestid = (String) en.nextElement(); TargetItem ti = (TargetItem) targetTable.get(requestid); if (counter >= maxParallelDownload) { break; } if (ti.errorcount >= maxRetry) { continue; } if (ti.timestamp > 0 && ti.timestamp + timeout < System.currentTimeMillis()) { clearBitSetReq(ti.bsReq); ti.errorcount++; } gcBitSetReq(ti.bsReq); if (ti.errorcount == 0 && countBitSet(ti.bsReq) >= chunksAtOneTime) { counter++; continue; } boolean ok = sendRequest(requestid, ti); ti.timestamp = System.currentTimeMillis(); if (ok) { counter++; } else { ti.errorcount++; } } if (counter == 0) { isDone = true; group.getEndpointService().removeIncomingMessageListener( address.getServiceName(), address.getServiceParameter()); trimFile(); notifyFailure(); } if (bitSetReq != null) { gcBitSetReq(bitSetReq); } try { Thread.sleep(500); } catch (InterruptedException e) { } } } private boolean sendRequest(String requestid, TargetItem ti) { ContentAdvertisement cAdv = ti.cAdv; // get the endpoint messenger for the result Peer if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("GetContentRequest " + cAdv.getAddress()); EndpointAddress destAddr = new EndpointAddress(cAdv.getAddress()); long rangeBegin = 0; long rangeEnd = 64 * 1024 - 1; if (bitSetReq != null) { int chunkBegin = -1; int chunkEnd = -1; for (int i = 0; i < numberChunks; i++) { if (chunkBegin >= 0) { if (bitSet.get(i) || bitSetReq.get(i)) { chunkEnd = i - 1; break; } } else { if (!bitSet.get(i) && !bitSetReq.get(i)) { chunkBegin = i; } } } if (chunkBegin == -1) { // can happen? clearBitSetReq(bitSetReq); return true; } if (chunkEnd == -1) { chunkEnd = numberChunks - 1; } if (chunkEnd > chunkBegin + chunksAtOneTime - 1) { chunkEnd = chunkBegin + chunksAtOneTime - 1; } for (int i = chunkBegin; i <= chunkEnd; i++) { bitSetReq.set(i); ti.bsReq.set(i); } rangeBegin = chunkBegin * chunkSize; rangeEnd = (chunkEnd + 1) * chunkSize - 1; if (rangeEnd >= contentLength) { rangeEnd = contentLength - 1; } } try { Messenger messenger = group.getEndpointService().getMessenger(destAddr); // send the request Message message = new Message(); message.addMessageElement( new ByteArrayMessageElement( CMS.MESSAGE_TYPE, CMS.encodeAs, CMS.GET_REQUEST.getBytes(), null)); //message.setBytes(CMS.MESSAGE_TYPE, CMS.GET_REQUEST.getBytes()); message.addMessageElement( new ByteArrayMessageElement( CMS.REQUEST_ID, CMS.encodeAs, requestid.getBytes(), null)); //message.setBytes(CMS.REQUEST_ID, requestid.getBytes()); message.addMessageElement( new ByteArrayMessageElement( CMS.CONTENT_ID, CMS.encodeAs, cAdv.getContentId().toString().getBytes(), null)); // message.setBytes( // CMS.CONTENT_ID, // cAdv.getContentId().toString().getBytes()); message.addMessageElement( new ByteArrayMessageElement( CMS.RETURN_ADDRESS, CMS.encodeAs, address.toString().getBytes(), null));// message.setBytes(CMS.RETURN_ADDRESS, address.toString().getBytes()); message.addMessageElement( new ByteArrayMessageElement( CMS.RANGE_BEGIN ,CMS.encodeAs ,Long.toString(rangeBegin).getBytes() ,null));// message.setBytes(// CMS.RANGE_BEGIN,// Long.toString(rangeBegin).getBytes()); message.addMessageElement( new ByteArrayMessageElement( CMS.RANGE_END ,CMS.encodeAs ,Long.toString(rangeEnd).getBytes() ,null));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -