⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliabletest.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* *  Copyright (c) 2003 Sun Microsystems, Inc.  All rights *  reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" *  and "Project JXTA" must not be used to endorse or *  promote products derived from this software without *  prior written permission. For written permission, *  please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. * *  $Id: ReliableTest.java,v 1.22 2005/12/10 02:38:49 bondolo Exp $ */package net.jxta.impl.util.pipe.reliable;import net.jxta.id.IDFactory;import net.jxta.id.ID;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeMsgEvent;import net.jxta.protocol.PipeAdvertisement;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupID;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.document.MimeMediaType;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.Message.ElementIterator;import java.util.Vector;import java.util.Timer;import java.util.TimerTask;import java.util.Enumeration;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.Collections;import java.io.IOException;import java.io.StringReader;import junit.framework.TestSuite;import junit.framework.TestCase;import junit.framework.Test;import junit.textui.TestRunner;import net.jxta.impl.util.UnbiasedQueue;public class ReliableTest extends TestCase implements     RendezvousListener, DiscoveryListener,     PipeMsgListener, OutputPipeListener {    private static int MIN_LOAD = 1024;    private static int MAX_LOAD = 65536;    private static final String MESSAGE_TAG = "reliable.message";    private static final String SENT_AT_TAG = "reliable.sent.at";    private static final String PAYLOAD_TAG = "reliable.payload";    private static final MimeMediaType MIME_BINARY = 	new MimeMediaType("application/octet-stream");    private static String MSG_PIPE_NAME = "ReliableTestMsgPipe";    private static String ACK_PIPE_NAME = "ReliableTestAckPipe";    private static boolean DEBUG = false;    private static boolean ADAPTIVE = false;    private static boolean IS_QUIET = false;    private static boolean IS_SENDER = false;    private static boolean IS_SERVER = false;    private static boolean waitRdv = false;    private static String PRINCIPAL = "password";    private static String PASSWORD = "password";    private static int DROP_MSG = Integer.MAX_VALUE;    private static int BW_LIMIT = Integer.MAX_VALUE;    private static int PIPE_LEN = 327680; // 20 packets of 16K    private static int LATENCY = 0;    private static int DELAY = 200;    private static int ITERATIONS = 1000;        private Object rdvConnectLock = new Object();    private Random random = new Random(System.currentTimeMillis());    private int nextMessageId = 0;    private ArrayList loadElements = null;    private int dropMsgCount = 0;    private PeerGroup netPeerGroup = null;    private RendezVousService rendezvousService = null;    private DiscoveryService discoverySvc = null;    private PipeService pipeSvc = null;    PipeAdvertisement msgPipeAdv = null;    PipeAdvertisement ackPipeAdv = null;    OutputPipe outputPipe = null;    InputPipe inputPipe = null;    OutgoingPipeAdaptorSync outgoing = null;    IncomingPipeAdaptor incoming = null;    ReliableOutputStream ros = null;    ReliableInputStream ris = null;    Timer bwTimer = new Timer();    UnbiasedQueue bwQueue = new UnbiasedQueue(Integer.MAX_VALUE, false);    long bwQueueSz = 0;    long nextInjectTime = 0;    long delayAdj = 0;    long roundingLoss = 0;    long lostToCongestion = 0;    class TimedMsg extends TimerTask {        long delivDate;        public TimedMsg(long date) {            delivDate = date;        }        public void run() {            Message msg;            synchronized(bwQueue) {                msg = (Message) bwQueue.pop();                long msgLen = msg.getByteLength();                bwQueueSz -= msgLen;                delayAdj = delivDate - System.currentTimeMillis();                if (ros != null) ros.recv(msg);                else if (ris != null) ris.recv(msg);            }        }    }    private void bwQueueMsg(Message msg) {        synchronized(bwQueue) {            long len =  msg.getByteLength();            if (bwQueueSz + len > PIPE_LEN) {                lostToCongestion++;                if (!IS_QUIET) System.out.println("\nSimulating congestion");                return;            }            bwQueue.push(msg);            bwQueueSz += len;        }        // Schedule delivery of the message based on bw, layency, and        // current messages in the pipe.        long now = System.currentTimeMillis();        // The injection or extraction time depends on length and bandwidth        long bitsToClock = msg.getByteLength() * 8000 + roundingLoss;        long delay = bitsToClock / (BW_LIMIT * 1024);        long roundingLoss = bitsToClock % (BW_LIMIT * 1024);        // We can inject a message if/after the last byte of the previous one        // is done injecting.         nextInjectTime = Math.max(nextInjectTime, now) + delay;                // At the new nextInjectTime, we have injected the last byte of the        // new message. The message is delivered when this last byte arrives.        long delivDate = nextInjectTime + LATENCY;        long delivDelay = delivDate - now;        if (delayAdj >= 10) {            delivDelay += 10;            delayAdj -= 10;        }        if (delayAdj <= -10) {            delivDelay -= 10;            delayAdj += 10;        }        // A carefully chosen combination of unrealistic parameters can        // lead to an attempt at delivering messages in the past.        if (delivDelay <= 0) {            delivDelay = 1;        }        // Because we strictly serialize messages.        // Their delivery order is the same than their queuing order. So the        // timer task only needs to pickup the next message and deliver it.        bwTimer.schedule(new TimedMsg(delivDate), delivDelay);    }    public ReliableTest(String testName) {        super(testName);    }    public static Test suite() {        TestSuite suite = new TestSuite(ReliableTest.class);        return suite;    }     protected void setUp() {	loadElements = new ArrayList();	for (int size = MIN_LOAD; size <= MAX_LOAD; size = size << 1) {	    byte[] le = new byte[size];	    random.nextBytes(le);	    loadElements.add(le);	}	System.setProperty("net.jxta.tls.password", PASSWORD);	System.setProperty("net.jxta.tls.principal", PRINCIPAL);	try {	    netPeerGroup = PeerGroupFactory.newNetPeerGroup();	    discoverySvc = netPeerGroup.getDiscoveryService();	    pipeSvc = netPeerGroup.getPipeService();	    rendezvousService = netPeerGroup.getRendezVousService();	    rendezvousService.addListener(this);	} catch (Throwable t) {	    t.printStackTrace();	    fail("failed to start jxta");	}        if (waitRdv) {            System.out.print("connecting to rendezvous...");            System.out.flush();            synchronized (rdvConnectLock) {                while (!rendezvousService.isConnectedToRendezVous()) {                    System.out.print(".");                    System.out.flush();                    try { rdvConnectLock.wait(10*DELAY); }                     catch (InterruptedException ignore) {}                }            }            System.out.println(" connected");        }    }    public void tearDown() {        System.exit(0);    }        public static void main(String[] args) throws Exception {        parse(args);        TestRunner.run(suite());        System.err.flush();        System.out.flush();    }    public static void parse(String[] args) {        for (int i = 0; i < args.length; i++) {            if (args[i].equals("-quiet")) {                IS_QUIET = true;            } else if (args[i].equals("-sender")) {                IS_SENDER = true;            } else if (args[i].equals("-receiver")) {                IS_SENDER = false;            } else if (args[i].equals("-server")) {                IS_SENDER = false;                IS_SERVER = true;            } else if (args[i].equals("-waitrdv")) {                waitRdv = true;            } else if (args[i].equals("-delay") && i+1 < args.length) {                String delayStr = args[++i];                try {                    DELAY = Integer.parseInt(delayStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid delay: " + delayStr + USAGE);		    return;                }            } else if (args[i].equals("-iterations") && i+1 < args.length) {                String iterStr = args[++i];                try {                    ITERATIONS = Integer.parseInt(iterStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid iterations: " +                                        iterStr + USAGE);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -