⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 getcontentrequest.java

📁 jxta-cms-src-2.4.zip 是官网上的比较稳定的CMS源代码。目前是最稳定的高版本。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2001 Sun Microsystems, Inc.  All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright *    notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright *    notice, this list of conditions and the following disclaimer in *    the documentation and/or other materials provided with the *    distribution. * * 3. The end-user documentation included with the redistribution, *    if any, must include the following acknowledgment: *       "This product includes software developed by the *       Sun Microsystems, Inc. for Project JXTA." *    Alternately, this acknowledgment may appear in the software itself, *    if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must *    not be used to endorse or promote products derived from this *    software without prior written permission. For written *    permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", *    nor may "JXTA" appear in their name, without prior written *    permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * *==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA.  For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * * $Id: GetContentRequest.java,v 1.23 2006/02/07 20:43:51 bondolo Exp $ * */package net.jxta.share.client;import java.io.File;import java.io.InputStream;import java.io.IOException;import java.io.RandomAccessFile;import java.util.BitSet;import java.util.Hashtable;import java.util.Enumeration;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.Message;import net.jxta.peergroup.PeerGroup;import net.jxta.share.CMS;import net.jxta.share.ContentAdvertisement;import org.apache.log4j.Logger;import org.apache.log4j.Level;/** * This class implements Get Content Request. * It request the given content from the given CMS request pipe. */public class GetContentRequest implements EndpointListener, Runnable {	private final static transient Logger LOG = Logger.getLogger(GetContentRequest.class.getName());	private PeerGroup group = null;	private EndpointAddress address = null;	private ContentAdvertisement cAdv = null;	private Hashtable targetTable = new Hashtable();	private File destFile = null;	private Thread thread = null;	private volatile boolean isDone = false;	private boolean hasFailed = false;	private long contentLength = 0;	private int chunkSize = 0;	private int numberChunks = 0;	private int numberReceived = 0;	private BitSet bitSet = null;	private BitSet bitSetReq = null;	protected int maxParallelDownload = 10;	protected int maxRetry = 3;	protected long timeout = 20 * 1000; // 20sec * 3 = 1min	protected int chunksAtOneTime = 10;	private static int requestCounter = 0;	private static Object counterSynch = new Object();	public static int getUnusedRequestID() {		synchronized (counterSynch) {			return requestCounter++;		}	}	/**	 * Create a GetContentRequest to download remotely shared content to disk.	 * The download will start immediately after this constructor is called.	 *	 *@param group the peergroup in which the content is being shared	 *@param cAdv the advertisement for the content	 *@param inFile the file in which to save the content 	 */	public GetContentRequest(		PeerGroup group,		ContentAdvertisement cAdv,		File inFile) {		this(group, new ContentAdvertisement[] { cAdv }, inFile);	}	/**	 * Similar to	 * {@link #GetContentRequest(PeerGroup, ContentAdvertisement, File)}	 * , except the content is downloaded from multiple sources.	 *@param cAdvs advertisements from the various sources that the content is	 * to be downloaded from.	 */	public GetContentRequest(		PeerGroup group,		ContentAdvertisement[] cAdvs,		File inFile) {		if (cAdvs.length < 1) {			isDone = true;			hasFailed = true;			notifyFailure();			return;		}		this.group = group;		this.cAdv = cAdvs[0];		destFile = inFile;		// register the message listener		String addressStr = null;		addressStr =			"jxta://"				+ CMS.getPeerId(group)				+ "/CMS-GetRequest"				+ Integer.toString(getUnusedRequestID())				+ "/"				+ CMS.getGroupId(group);		if (LOG.isEnabledFor(Level.DEBUG))			LOG.debug(				"ContentName = '"					+ cAdv.getName()					+ "' reply address = "					+ addressStr);		if (LOG.isEnabledFor(Level.DEBUG))			LOG.debug("save content to '" + inFile.getPath() + "'");		address = new EndpointAddress(addressStr);		if (LOG.isEnabledFor(Level.DEBUG))			LOG.debug(				"register address = "					+ address.getServiceName()					+ address.getServiceParameter());		group.getEndpointService().addIncomingMessageListener(			this,			address.getServiceName(),			address.getServiceParameter());		for (int i = 0; i < cAdvs.length; i++) {			//use a unique request ID for each of the sources			String requestid = address.getServiceName() + ":" + i;			//store the status information of each of the parallel downloads			// in the targetTable			targetTable.put(requestid, new TargetItem(cAdvs[i]));		}		thread = new Thread(this);		thread.start();	}	public void run() {		while (!isDone) {			int counter = 0;			Enumeration en = targetTable.keys();			while (en.hasMoreElements()) {				String requestid = (String) en.nextElement();				TargetItem ti = (TargetItem) targetTable.get(requestid);				if (counter >= maxParallelDownload) {					break;				}				if (ti.errorcount >= maxRetry) {					continue;				}				if (ti.timestamp > 0					&& ti.timestamp + timeout < System.currentTimeMillis()) {					clearBitSetReq(ti.bsReq);					ti.errorcount++;				}				gcBitSetReq(ti.bsReq);				if (ti.errorcount == 0					&& countBitSet(ti.bsReq) >= chunksAtOneTime) {					counter++;					continue;				}				boolean ok = sendRequest(requestid, ti);				ti.timestamp = System.currentTimeMillis();				if (ok) {					counter++;				} else {					ti.errorcount++;				}			}			if (counter == 0) {				isDone = true;				group.getEndpointService().removeIncomingMessageListener(					address.getServiceName(),					address.getServiceParameter());				trimFile();				notifyFailure();			}			if (bitSetReq != null) {				gcBitSetReq(bitSetReq);			}			try {				Thread.sleep(500);			} catch (InterruptedException e) {			}		}	}	private boolean sendRequest(String requestid, TargetItem ti) {		ContentAdvertisement cAdv = ti.cAdv;		// get the endpoint messenger for the result Peer		if (LOG.isEnabledFor(Level.DEBUG))			LOG.debug("GetContentRequest " + cAdv.getAddress());		EndpointAddress destAddr = new EndpointAddress(cAdv.getAddress());		long rangeBegin = 0;		long rangeEnd = 64 * 1024 - 1;		if (bitSetReq != null) {			int chunkBegin = -1;			int chunkEnd = -1;			for (int i = 0; i < numberChunks; i++) {				if (chunkBegin >= 0) {					if (bitSet.get(i) || bitSetReq.get(i)) {						chunkEnd = i - 1;						break;					}				} else {					if (!bitSet.get(i) && !bitSetReq.get(i)) {						chunkBegin = i;					}				}			}			if (chunkBegin == -1) {				// can happen?				clearBitSetReq(bitSetReq);				return true;			}			if (chunkEnd == -1) {				chunkEnd = numberChunks - 1;			}			if (chunkEnd > chunkBegin + chunksAtOneTime - 1) {				chunkEnd = chunkBegin + chunksAtOneTime - 1;			}			for (int i = chunkBegin; i <= chunkEnd; i++) {				bitSetReq.set(i);				ti.bsReq.set(i);			}			rangeBegin = chunkBegin * chunkSize;			rangeEnd = (chunkEnd + 1) * chunkSize - 1;			if (rangeEnd >= contentLength) {				rangeEnd = contentLength - 1;			}		}		try {			Messenger messenger =				group.getEndpointService().getMessenger(destAddr);			// send the request			Message message = new Message();			message.addMessageElement(				new ByteArrayMessageElement(					CMS.MESSAGE_TYPE,					CMS.encodeAs,					CMS.GET_REQUEST.getBytes(),					null));			//message.setBytes(CMS.MESSAGE_TYPE, CMS.GET_REQUEST.getBytes());			message.addMessageElement(				new ByteArrayMessageElement(					CMS.REQUEST_ID,					CMS.encodeAs,					requestid.getBytes(),					null));			//message.setBytes(CMS.REQUEST_ID, requestid.getBytes());			message.addMessageElement(				new ByteArrayMessageElement(					CMS.CONTENT_ID,					CMS.encodeAs,					cAdv.getContentId().toString().getBytes(),					null));			//			message.setBytes(			//				CMS.CONTENT_ID,			//				cAdv.getContentId().toString().getBytes());			message.addMessageElement(				new ByteArrayMessageElement(					CMS.RETURN_ADDRESS,					CMS.encodeAs,					address.toString().getBytes(),					null));//			message.setBytes(CMS.RETURN_ADDRESS, address.toString().getBytes());			message.addMessageElement(				new ByteArrayMessageElement(					CMS.RANGE_BEGIN					,CMS.encodeAs					,Long.toString(rangeBegin).getBytes()					,null));//			message.setBytes(//				CMS.RANGE_BEGIN,//				Long.toString(rangeBegin).getBytes());			message.addMessageElement(				new ByteArrayMessageElement(					CMS.RANGE_END					,CMS.encodeAs					,Long.toString(rangeEnd).getBytes()					,null));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -