📄 reliabletest.java
字号:
/* * Copyright (c) 2003 Sun Microsystems, Inc. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" * and "Project JXTA" must not be used to endorse or * promote products derived from this software without * prior written permission. For written permission, * please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * * $Id: ReliableTest.java,v 1.22 2005/12/10 02:38:49 bondolo Exp $ */package net.jxta.impl.util.pipe.reliable;import net.jxta.id.IDFactory;import net.jxta.id.ID;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeMsgEvent;import net.jxta.protocol.PipeAdvertisement;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupID;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.document.MimeMediaType;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.Message.ElementIterator;import java.util.Vector;import java.util.Timer;import java.util.TimerTask;import java.util.Enumeration;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.Collections;import java.io.IOException;import java.io.StringReader;import junit.framework.TestSuite;import junit.framework.TestCase;import junit.framework.Test;import junit.textui.TestRunner;import net.jxta.impl.util.UnbiasedQueue;public class ReliableTest extends TestCase implements RendezvousListener, DiscoveryListener, PipeMsgListener, OutputPipeListener { private static int MIN_LOAD = 1024; private static int MAX_LOAD = 65536; private static final String MESSAGE_TAG = "reliable.message"; private static final String SENT_AT_TAG = "reliable.sent.at"; private static final String PAYLOAD_TAG = "reliable.payload"; private static final MimeMediaType MIME_BINARY = new MimeMediaType("application/octet-stream"); private static String MSG_PIPE_NAME = "ReliableTestMsgPipe"; private static String ACK_PIPE_NAME = "ReliableTestAckPipe"; private static boolean DEBUG = false; private static boolean ADAPTIVE = false; private static boolean IS_QUIET = false; private static boolean IS_SENDER = false; private static boolean IS_SERVER = false; private static boolean waitRdv = false; private static String PRINCIPAL = "password"; private static String PASSWORD = "password"; private static int DROP_MSG = Integer.MAX_VALUE; private static int BW_LIMIT = Integer.MAX_VALUE; private static int PIPE_LEN = 327680; // 20 packets of 16K private static int LATENCY = 0; private static int DELAY = 200; private static int ITERATIONS = 1000; private Object rdvConnectLock = new Object(); private Random random = new Random(System.currentTimeMillis()); private int nextMessageId = 0; private ArrayList loadElements = null; private int dropMsgCount = 0; private PeerGroup netPeerGroup = null; private RendezVousService rendezvousService = null; private DiscoveryService discoverySvc = null; private PipeService pipeSvc = null; PipeAdvertisement msgPipeAdv = null; PipeAdvertisement ackPipeAdv = null; OutputPipe outputPipe = null; InputPipe inputPipe = null; OutgoingPipeAdaptorSync outgoing = null; IncomingPipeAdaptor incoming = null; ReliableOutputStream ros = null; ReliableInputStream ris = null; Timer bwTimer = new Timer(); UnbiasedQueue bwQueue = new UnbiasedQueue(Integer.MAX_VALUE, false); long bwQueueSz = 0; long nextInjectTime = 0; long delayAdj = 0; long roundingLoss = 0; long lostToCongestion = 0; class TimedMsg extends TimerTask { long delivDate; public TimedMsg(long date) { delivDate = date; } public void run() { Message msg; synchronized(bwQueue) { msg = (Message) bwQueue.pop(); long msgLen = msg.getByteLength(); bwQueueSz -= msgLen; delayAdj = delivDate - System.currentTimeMillis(); if (ros != null) ros.recv(msg); else if (ris != null) ris.recv(msg); } } } private void bwQueueMsg(Message msg) { synchronized(bwQueue) { long len = msg.getByteLength(); if (bwQueueSz + len > PIPE_LEN) { lostToCongestion++; if (!IS_QUIET) System.out.println("\nSimulating congestion"); return; } bwQueue.push(msg); bwQueueSz += len; } // Schedule delivery of the message based on bw, layency, and // current messages in the pipe. long now = System.currentTimeMillis(); // The injection or extraction time depends on length and bandwidth long bitsToClock = msg.getByteLength() * 8000 + roundingLoss; long delay = bitsToClock / (BW_LIMIT * 1024); long roundingLoss = bitsToClock % (BW_LIMIT * 1024); // We can inject a message if/after the last byte of the previous one // is done injecting. nextInjectTime = Math.max(nextInjectTime, now) + delay; // At the new nextInjectTime, we have injected the last byte of the // new message. The message is delivered when this last byte arrives. long delivDate = nextInjectTime + LATENCY; long delivDelay = delivDate - now; if (delayAdj >= 10) { delivDelay += 10; delayAdj -= 10; } if (delayAdj <= -10) { delivDelay -= 10; delayAdj += 10; } // A carefully chosen combination of unrealistic parameters can // lead to an attempt at delivering messages in the past. if (delivDelay <= 0) { delivDelay = 1; } // Because we strictly serialize messages. // Their delivery order is the same than their queuing order. So the // timer task only needs to pickup the next message and deliver it. bwTimer.schedule(new TimedMsg(delivDate), delivDelay); } public ReliableTest(String testName) { super(testName); } public static Test suite() { TestSuite suite = new TestSuite(ReliableTest.class); return suite; } protected void setUp() { loadElements = new ArrayList(); for (int size = MIN_LOAD; size <= MAX_LOAD; size = size << 1) { byte[] le = new byte[size]; random.nextBytes(le); loadElements.add(le); } System.setProperty("net.jxta.tls.password", PASSWORD); System.setProperty("net.jxta.tls.principal", PRINCIPAL); try { netPeerGroup = PeerGroupFactory.newNetPeerGroup(); discoverySvc = netPeerGroup.getDiscoveryService(); pipeSvc = netPeerGroup.getPipeService(); rendezvousService = netPeerGroup.getRendezVousService(); rendezvousService.addListener(this); } catch (Throwable t) { t.printStackTrace(); fail("failed to start jxta"); } if (waitRdv) { System.out.print("connecting to rendezvous..."); System.out.flush(); synchronized (rdvConnectLock) { while (!rendezvousService.isConnectedToRendezVous()) { System.out.print("."); System.out.flush(); try { rdvConnectLock.wait(10*DELAY); } catch (InterruptedException ignore) {} } } System.out.println(" connected"); } } public void tearDown() { System.exit(0); } public static void main(String[] args) throws Exception { parse(args); TestRunner.run(suite()); System.err.flush(); System.out.flush(); } public static void parse(String[] args) { for (int i = 0; i < args.length; i++) { if (args[i].equals("-quiet")) { IS_QUIET = true; } else if (args[i].equals("-sender")) { IS_SENDER = true; } else if (args[i].equals("-receiver")) { IS_SENDER = false; } else if (args[i].equals("-server")) { IS_SENDER = false; IS_SERVER = true; } else if (args[i].equals("-waitrdv")) { waitRdv = true; } else if (args[i].equals("-delay") && i+1 < args.length) { String delayStr = args[++i]; try { DELAY = Integer.parseInt(delayStr); } catch (NumberFormatException ex) { System.err.println("Invalid delay: " + delayStr + USAGE); return; } } else if (args[i].equals("-iterations") && i+1 < args.length) { String iterStr = args[++i]; try { ITERATIONS = Integer.parseInt(iterStr); } catch (NumberFormatException ex) { System.err.println("Invalid iterations: " + iterStr + USAGE);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -