⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliabletest.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (c) 2003-2007 Sun Microsystems, Inc.  All rights reserved. *   *  The Sun Project JXTA(TM) Software License *   *  Redistribution and use in source and binary forms, with or without  *  modification, are permitted provided that the following conditions are met: *   *  1. Redistributions of source code must retain the above copyright notice, *     this list of conditions and the following disclaimer. *   *  2. Redistributions in binary form must reproduce the above copyright notice,  *     this list of conditions and the following disclaimer in the documentation  *     and/or other materials provided with the distribution. *   *  3. The end-user documentation included with the redistribution, if any, must  *     include the following acknowledgment: "This product includes software  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology."  *     Alternately, this acknowledgment may appear in the software itself, if  *     and wherever such third-party acknowledgments normally appear. *   *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must  *     not be used to endorse or promote products derived from this software  *     without prior written permission. For written permission, please contact  *     Project JXTA at http://www.jxta.org. *   *  5. Products derived from this software may not be called "JXTA", nor may  *     "JXTA" appear in their name, without prior written permission of Sun. *   *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *   *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United  *  States and other countries. *   *  Please see the license information page at : *  <http://www.jxta.org/project/www/license.html> for instructions on use of  *  the license in source files. *   *  ==================================================================== *   *  This software consists of voluntary contributions made by many individuals  *  on behalf of Project JXTA. For more information on Project JXTA, please see  *  http://www.jxta.org. *   *  This license is based on the BSD license adopted by the Apache Foundation.  */package net.jxta.impl.util.pipe.reliable;import net.jxta.id.IDFactory;import net.jxta.id.ID;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeMsgEvent;import net.jxta.protocol.PipeAdvertisement;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupID;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.document.MimeMediaType;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.Message.ElementIterator;import java.util.Vector;import java.util.Timer;import java.util.TimerTask;import java.util.Enumeration;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.Collections;import java.io.IOException;import java.io.StringReader;import junit.framework.TestSuite;import junit.framework.TestCase;import junit.framework.Test;import junit.textui.TestRunner;import net.jxta.impl.util.UnbiasedQueue;public class ReliableTest extends TestCase implements         RendezvousListener, DiscoveryListener, PipeMsgListener, OutputPipeListener {    private static int MIN_LOAD = 1024;    private static int MAX_LOAD = 65536;    private static final String MESSAGE_TAG = "reliable.message";    private static final String SENT_AT_TAG = "reliable.sent.at";    private static final String PAYLOAD_TAG = "reliable.payload";    private static final MimeMediaType MIME_BINARY = MimeMediaType.AOS;    private static String MSG_PIPE_NAME = "ReliableTestMsgPipe";    private static String ACK_PIPE_NAME = "ReliableTestAckPipe";    private static boolean DEBUG = false;    private static boolean ADAPTIVE = false;    private static boolean IS_QUIET = false;    private static boolean IS_SENDER = false;    private static boolean IS_SERVER = false;    private static boolean waitRdv = false;    private static String PRINCIPAL = "password";    private static String PASSWORD = "password";    private static int DROP_MSG = Integer.MAX_VALUE;    private static int BW_LIMIT = Integer.MAX_VALUE;    private static int PIPE_LEN = 327680; // 20 packets of 16K    private static int LATENCY = 0;    private static int DELAY = 200;    private static int ITERATIONS = 1000;        private Object rdvConnectLock = new Object();    private Random random = new Random(System.currentTimeMillis());    private int nextMessageId = 0;    private ArrayList loadElements = null;    private int dropMsgCount = 0;    private PeerGroup netPeerGroup = null;    private RendezVousService rendezvousService = null;    private DiscoveryService discoverySvc = null;    private PipeService pipeSvc = null;    PipeAdvertisement msgPipeAdv = null;    PipeAdvertisement ackPipeAdv = null;    OutputPipe outputPipe = null;    InputPipe inputPipe = null;    OutgoingPipeAdaptorSync outgoing = null;    IncomingPipeAdaptor incoming = null;    ReliableOutputStream ros = null;    ReliableInputStream ris = null;    Timer bwTimer = new Timer();    UnbiasedQueue bwQueue = new UnbiasedQueue(Integer.MAX_VALUE, false);    long bwQueueSz = 0;    long nextInjectTime = 0;    long delayAdj = 0;    long roundingLoss = 0;    long lostToCongestion = 0;    class TimedMsg extends TimerTask {        long delivDate;        public TimedMsg(long date) {            delivDate = date;        }        @Override        public void run() {            Message msg;            synchronized (bwQueue) {                msg = (Message) bwQueue.pop();                long msgLen = msg.getByteLength();                bwQueueSz -= msgLen;                delayAdj = delivDate - System.currentTimeMillis();                if (ros != null) {                    ros.recv(msg);                } else if (ris != null) {                    ris.recv(msg);                }            }        }    }    private void bwQueueMsg(Message msg) {        synchronized (bwQueue) {            long len = msg.getByteLength();            if (bwQueueSz + len > PIPE_LEN) {                lostToCongestion++;                if (!IS_QUIET) {                    System.out.println("\nSimulating congestion");                }                return;            }            bwQueue.push(msg);            bwQueueSz += len;        }        // Schedule delivery of the message based on bw, layency, and        // current messages in the pipe.        long now = System.currentTimeMillis();        // The injection or extraction time depends on length and bandwidth        long bitsToClock = msg.getByteLength() * 8000 + roundingLoss;        long delay = bitsToClock / (BW_LIMIT * 1024);        long roundingLoss = bitsToClock % (BW_LIMIT * 1024);        // We can inject a message if/after the last byte of the previous one        // is done injecting.         nextInjectTime = Math.max(nextInjectTime, now) + delay;                // At the new nextInjectTime, we have injected the last byte of the        // new message. The message is delivered when this last byte arrives.        long delivDate = nextInjectTime + LATENCY;        long delivDelay = delivDate - now;        if (delayAdj >= 10) {            delivDelay += 10;            delayAdj -= 10;        }        if (delayAdj <= -10) {            delivDelay -= 10;            delayAdj += 10;        }        // A carefully chosen combination of unrealistic parameters can        // lead to an attempt at delivering messages in the past.        if (delivDelay <= 0) {            delivDelay = 1;        }        // Because we strictly serialize messages.        // Their delivery order is the same than their queuing order. So the        // timer task only needs to pickup the next message and deliver it.        bwTimer.schedule(new TimedMsg(delivDate), delivDelay);    }    public ReliableTest(String testName) {        super(testName);    }    public static Test suite() {        TestSuite suite = new TestSuite(ReliableTest.class);        return suite;    }     @Override    protected void setUp() {        loadElements = new ArrayList();        for (int size = MIN_LOAD; size <= MAX_LOAD; size = size << 1) {            byte[] le = new byte[size];            random.nextBytes(le);            loadElements.add(le);        }        System.setProperty("net.jxta.tls.password", PASSWORD);        System.setProperty("net.jxta.tls.principal", PRINCIPAL);        try {            netPeerGroup = PeerGroupFactory.newNetPeerGroup();            discoverySvc = netPeerGroup.getDiscoveryService();            pipeSvc = netPeerGroup.getPipeService();            rendezvousService = netPeerGroup.getRendezVousService();            rendezvousService.addListener(this);        } catch (Throwable t) {            t.printStackTrace();            fail("failed to start jxta");        }        if (waitRdv) {            System.out.print("connecting to rendezvous...");            System.out.flush();            synchronized (rdvConnectLock) {                while (!rendezvousService.isConnectedToRendezVous()) {                    System.out.print(".");                    System.out.flush();                    try {                        rdvConnectLock.wait(10 * DELAY);                    } catch (InterruptedException ignore) {}                }            }            System.out.println(" connected");        }    }    @Override    public void tearDown() {        System.exit(0);    }        public static void main(String[] args) throws Exception {        parse(args);        TestRunner.run(suite());        System.err.flush();        System.out.flush();    }    public static void parse(String[] args) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -