📄 reliabletest.java
字号:
/* * Copyright (c) 2003-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.util.pipe.reliable;import net.jxta.id.IDFactory;import net.jxta.id.ID;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeMsgEvent;import net.jxta.protocol.PipeAdvertisement;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupID;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.document.MimeMediaType;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.Message.ElementIterator;import java.util.Vector;import java.util.Timer;import java.util.TimerTask;import java.util.Enumeration;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.Collections;import java.io.IOException;import java.io.StringReader;import junit.framework.TestSuite;import junit.framework.TestCase;import junit.framework.Test;import junit.textui.TestRunner;import net.jxta.impl.util.UnbiasedQueue;public class ReliableTest extends TestCase implements RendezvousListener, DiscoveryListener, PipeMsgListener, OutputPipeListener { private static int MIN_LOAD = 1024; private static int MAX_LOAD = 65536; private static final String MESSAGE_TAG = "reliable.message"; private static final String SENT_AT_TAG = "reliable.sent.at"; private static final String PAYLOAD_TAG = "reliable.payload"; private static final MimeMediaType MIME_BINARY = MimeMediaType.AOS; private static String MSG_PIPE_NAME = "ReliableTestMsgPipe"; private static String ACK_PIPE_NAME = "ReliableTestAckPipe"; private static boolean DEBUG = false; private static boolean ADAPTIVE = false; private static boolean IS_QUIET = false; private static boolean IS_SENDER = false; private static boolean IS_SERVER = false; private static boolean waitRdv = false; private static String PRINCIPAL = "password"; private static String PASSWORD = "password"; private static int DROP_MSG = Integer.MAX_VALUE; private static int BW_LIMIT = Integer.MAX_VALUE; private static int PIPE_LEN = 327680; // 20 packets of 16K private static int LATENCY = 0; private static int DELAY = 200; private static int ITERATIONS = 1000; private Object rdvConnectLock = new Object(); private Random random = new Random(System.currentTimeMillis()); private int nextMessageId = 0; private ArrayList loadElements = null; private int dropMsgCount = 0; private PeerGroup netPeerGroup = null; private RendezVousService rendezvousService = null; private DiscoveryService discoverySvc = null; private PipeService pipeSvc = null; PipeAdvertisement msgPipeAdv = null; PipeAdvertisement ackPipeAdv = null; OutputPipe outputPipe = null; InputPipe inputPipe = null; OutgoingPipeAdaptorSync outgoing = null; IncomingPipeAdaptor incoming = null; ReliableOutputStream ros = null; ReliableInputStream ris = null; Timer bwTimer = new Timer(); UnbiasedQueue bwQueue = new UnbiasedQueue(Integer.MAX_VALUE, false); long bwQueueSz = 0; long nextInjectTime = 0; long delayAdj = 0; long roundingLoss = 0; long lostToCongestion = 0; class TimedMsg extends TimerTask { long delivDate; public TimedMsg(long date) { delivDate = date; } @Override public void run() { Message msg; synchronized (bwQueue) { msg = (Message) bwQueue.pop(); long msgLen = msg.getByteLength(); bwQueueSz -= msgLen; delayAdj = delivDate - System.currentTimeMillis(); if (ros != null) { ros.recv(msg); } else if (ris != null) { ris.recv(msg); } } } } private void bwQueueMsg(Message msg) { synchronized (bwQueue) { long len = msg.getByteLength(); if (bwQueueSz + len > PIPE_LEN) { lostToCongestion++; if (!IS_QUIET) { System.out.println("\nSimulating congestion"); } return; } bwQueue.push(msg); bwQueueSz += len; } // Schedule delivery of the message based on bw, layency, and // current messages in the pipe. long now = System.currentTimeMillis(); // The injection or extraction time depends on length and bandwidth long bitsToClock = msg.getByteLength() * 8000 + roundingLoss; long delay = bitsToClock / (BW_LIMIT * 1024); long roundingLoss = bitsToClock % (BW_LIMIT * 1024); // We can inject a message if/after the last byte of the previous one // is done injecting. nextInjectTime = Math.max(nextInjectTime, now) + delay; // At the new nextInjectTime, we have injected the last byte of the // new message. The message is delivered when this last byte arrives. long delivDate = nextInjectTime + LATENCY; long delivDelay = delivDate - now; if (delayAdj >= 10) { delivDelay += 10; delayAdj -= 10; } if (delayAdj <= -10) { delivDelay -= 10; delayAdj += 10; } // A carefully chosen combination of unrealistic parameters can // lead to an attempt at delivering messages in the past. if (delivDelay <= 0) { delivDelay = 1; } // Because we strictly serialize messages. // Their delivery order is the same than their queuing order. So the // timer task only needs to pickup the next message and deliver it. bwTimer.schedule(new TimedMsg(delivDate), delivDelay); } public ReliableTest(String testName) { super(testName); } public static Test suite() { TestSuite suite = new TestSuite(ReliableTest.class); return suite; } @Override protected void setUp() { loadElements = new ArrayList(); for (int size = MIN_LOAD; size <= MAX_LOAD; size = size << 1) { byte[] le = new byte[size]; random.nextBytes(le); loadElements.add(le); } System.setProperty("net.jxta.tls.password", PASSWORD); System.setProperty("net.jxta.tls.principal", PRINCIPAL); try { netPeerGroup = PeerGroupFactory.newNetPeerGroup(); discoverySvc = netPeerGroup.getDiscoveryService(); pipeSvc = netPeerGroup.getPipeService(); rendezvousService = netPeerGroup.getRendezVousService(); rendezvousService.addListener(this); } catch (Throwable t) { t.printStackTrace(); fail("failed to start jxta"); } if (waitRdv) { System.out.print("connecting to rendezvous..."); System.out.flush(); synchronized (rdvConnectLock) { while (!rendezvousService.isConnectedToRendezVous()) { System.out.print("."); System.out.flush(); try { rdvConnectLock.wait(10 * DELAY); } catch (InterruptedException ignore) {} } } System.out.println(" connected"); } } @Override public void tearDown() { System.exit(0); } public static void main(String[] args) throws Exception { parse(args); TestRunner.run(suite()); System.err.flush(); System.out.flush(); } public static void parse(String[] args) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -