⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliabletest.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        } catch (IOException ex) {            fail(ex.getMessage());        }        if (DEBUG) {            System.out.println("published msg pipe: " + msgPipeAdv.getName());            System.out.println("opened msg pipe for input");        }        // We need to give to our input a reference to our output, but,        // obviously, we need to create our input before resolving the output        // pipe, otherwise we do not known when to start trying to resolve.        // In addition, our output must not loose messages while the pipe is        // being resolved: these are acks. All the first packets would remain        // un-acked, which would make for a very slow start.        // Therefore we need a form of output that can be created before        // the pipe. This used to be solved by interposing queues and threads.        // Now, we create the outgoing adaptor without a pipe.        // The pipe will be set whenever it is ready. In the meantime, the        // reliable input stream will block or return false if it is used.        outgoing = new OutgoingPipeAdaptorSync(null);        ris = new ReliableInputStream(outgoing, 0);        try {            if (ackPipeAdv == null) {                ackPipeAdv = createPipeAdv(ACK_PIPE_NAME);                discoverySvc.publish(ackPipeAdv);                discoverySvc.remotePublish(ackPipeAdv);            }            pipeSvc.createOutputPipe(ackPipeAdv, this);                        } catch (IOException ex) {            fail(ex.getMessage());        }        if (DEBUG) {            System.out.println("published ack pipe: " + ackPipeAdv.getName());            System.out.println("opened ack pipe for output");        }        System.out.print("Waiting for sender...");        System.out.flush();        while (!ris.hasNextMessage()) {            pause();        }        System.out.println("Receiving");        for (int i=0; outputPipe == null;) {            pause();            if ((i++ % 11) == 0) {                if (DEBUG) {                    System.out.println("re-resolving output pipe " +                                        ackPipeAdv.getName());                }                try {                    pipeSvc.createOutputPipe(ackPipeAdv, this);                } catch (IOException ex) {                    System.err.println(ex.getMessage());                }            }        }        long startedAt = 0;        long bytesTransferred = 0;        long nbMsgs = 0;        while (true) {            Message msg = null;            try {                msg = ris.nextMessage();            } catch(IOException ioe) {                System.err.println("Failed to obtain next message");                ioe.printStackTrace();                         }            ++nbMsgs;            printMessageForDebug("doReceiver", msg);            String msgId = (msg.getMessageElement(MESSAGE_TAG)).toString();            String sentAt = (msg.getMessageElement(SENT_AT_TAG)).toString();            long size = msg.getByteLength();                        long now = System.currentTimeMillis();            long sent = now;            try {                sent = Long.parseLong(sentAt);            } catch (NumberFormatException nex) {                System.err.println("could not parse msg send time: " +                                    sentAt);                continue;            }            // We start computing averages only after the first message has            // been received.            long throughput = 0;            if (nbMsgs > (2 * ITERATIONS / 3)) {                if (startedAt == 0) {                    startedAt = System.currentTimeMillis();                } else {                    long totalTime = now - startedAt;                    if (totalTime <= 0) {                        // that can't be. Some time has passed.                        totalTime = 1;                    }                    bytesTransferred += size;                    throughput = (((1000 * 8 * bytesTransferred) / totalTime) / 1024);                }            }            long delay = now - sent;            if (msgId.equals("mclose")) {                System.out.println("\nResults" +                                   "\n-------" +                                   "\ntransferred:       " +                                   bytesTransferred + " bytes" +                                   "\nthroughput:        " +                                   throughput + " kbps" +                                   "\ncongestion events: " +                                   lostToCongestion);                try {                    ris.close();                } catch(IOException ioe1) {                }                ris = null;                try {                    outgoing.close();                } catch(IOException ioe2) {                }                outgoing = null;                outputPipe = null;                longPause();                break;            }            if (! IS_QUIET) {                System.out.print(msgId + " " +                                  size + "b " +                                  delay + "ms " +                                 throughput + "kbps avg\r");                System.out.flush();            }        }    }    private void printMessageForDebug(String header, Message msg) {        if (DEBUG) {            ElementIterator iter = msg.getMessageElements();            System.out.print(header + " " + IS_SENDER + " (");            while (iter.hasNext()) {                MessageElement el = (MessageElement) iter.next();                System.out.print(el.getElementName());                if (iter.hasNext()) {                    System.out.print(", ");                }            }            System.out.println(")");        }    }    public void pipeMsgEvent(PipeMsgEvent inputPipeEvent) {        Message msg = inputPipeEvent.getMessage();        if (msg == null) {            return;        }        printMessageForDebug("pipeMsgEvent", msg);        if (dropMessage()) {            if (DEBUG) {                System.out.println("dropped incoming:" +                                   (IS_SENDER ? "ack" : "msg"));            }            return;        }        if (BW_LIMIT < Integer.MAX_VALUE) {            bwQueueMsg(msg);            return;        }        // We're redirecting either to the ros or to the ris; depending        // on whether we're on one side or the other of the reliable stream.        // The other stream obj is null.        if (ros != null) ros.recv(msg);        else if (ris != null) ris.recv(msg);    }    public void outputPipeEvent(OutputPipeEvent outputPipeEvent) {        String pid = outputPipeEvent.getPipeID();        // this will happen in the sender        if (outputPipe == null &&	    pid.equals(msgPipeAdv.getPipeID().toString())) {            outputPipe = outputPipeEvent.getOutputPipe();            outgoing.setPipe(outputPipe);            // the next line is not needed in this case,            // since here we register 'this' as an            // PipeMsgListener (Input Pipe Listener) and            // 'manually' redirect the ack messages to            // ros.recv() from pipeMsgEvent            // incoming = new IncomingPipeAdaptor(inputPipe, ros);            if (DEBUG) {                System.out.println("resolved msg output pipe " +                                    outputPipe.getName());            }        }         // this will happen in the receiver        if (outputPipe == null &&            pid.equals(ackPipeAdv.getPipeID().toString())) {            outputPipe = outputPipeEvent.getOutputPipe();            outgoing.setPipe(outputPipe);            // the next line is not needed in this case,            // since here we register 'this' as an            // PipeMsgListener (Input Pipe Listener) and            // 'manually' redirect the ack messages to            // ros.recv() from pipeMsgEvent            // incoming = new IncomingPipeAdaptor(inputPipe, ris);            if (DEBUG) {                System.out.println("resolved ack output pipe " +                                    outputPipe.getName());            }        }    }    synchronized boolean dropMessage() {        return ((++dropMsgCount) % DROP_MSG) == 0;    }    private static final String USAGE =        "\nUsage: ReliableTest <options>" +        "\n" +        "options:\n" +        "  -help       outputs some usefull advice" +        "  -quiet      only output a summary (" +                       IS_QUIET + ")\n" +        "  -sender     whether to run as sender of messages (" +                        IS_SENDER + ")\n" +        "  -receiver   whether to run as a receiver of messages (" +                        !IS_SENDER + ")\n" +        "  -server     whether to run as a permanent receiver of messages (" +                        IS_SERVER + ")\n" +        "  -waitrdv    wait for a rendezvous connection before starting (not)\n" +        "  -drop       drop every Nth messages (on arrival) (" +                        DROP_MSG + ")\n" +        "  -bw         simulated bw cap in Kbit/s (on arrival) (" +                        BW_LIMIT + ")\n" +        "  -pl         simulated pipe length in bytes (only with bw) (" +                        PIPE_LEN + ")\n" +        "  -lat        simulated latency in ms (only with bw) (" +                        LATENCY + ")\n" +        "  -minload    smallest of the random payload sizes in bytes (" +                       MIN_LOAD + ")\n" +        "  -maxload    largest of the random payload sizes in bytes (" +                       MAX_LOAD + ")\n" +        "  -name       base name for the pipes (ReliableTest)\n" +        "  -debug      whether to turn on debugging in the peer (" +                        DEBUG + ")\n" +        "  -adapt      Use adaptive flow control (do not)\n" +        "  -iterations number of times to send a message (" + 	               ITERATIONS + ")\n" +        "  -delay      Basic delay unit (" + 	               DELAY + ")\n" +        "  -principal  net.jxta.tls.principal property (" + 	               PRINCIPAL + ")\n" +        "  -password   net.jxta.tls.password property (" + 	               PASSWORD + ")\n";    private static final String HELP =        "Some options serve to simulate particular network conditions.\n"+        "These conditions are simulated on the destination side of a\n" +        "link. As a result, the options given to the receiver will\n" +        "control the behaviour of the data channel, while the options\n" +        "to the sender will control the behaviour of the ack channel.\n"+        "\n" +        "These options are the following:\n"+        "-bw, as in \"bandwidth\":\n" +        "\tcontrols the time it takes for the slowest segment of the path\n" +        "\tto go from begining the transmition of one bit to being ready\n" +        "\tto transmit the next one. It is expressed in Kbit per second.\n" +        "-lat, as in \"latency\":\n" +        "\tcontrols the time it takes for a bit to traverse the network in\n" +        "\taddition to the time caused by the bandwidth. In real life, the\n" +        "\tprincipal contributors to this time would be signal travel time\n" +        "\tper the laws of physics, and packet processing time at each\n" +        "\tnode along the path. The essential characteristics of latency,\n" +        "\tis that over one given packet's latency time, a number of other\n" +        "\tpackets can begin traveling as well.\n" +        "-pl, as in \"path length\":\n"+        "\tcontrols the actual amount of data that can be traveling along\n" +        "\tthe network path (in bytes). In real life, this is the\n" +        "\tproduct bandwidth * latency + some buffering, but the three\n" +        "\tvalues can be chosen differently in order to simulate extreme\n" +        "\tconditions. For example, most often pl would be larger than\n" +        "\tthe bandwidth*latency product to reflect buffering capacity at\n" +        "\tvarious nodes along the path. Under non-congested conditions\n" +        "\tthese buffers are used to store at most one pending packet\n" +        "\ton each node, which ensures back-to-back link utilization\n" +        "\tdespite statistical variations. In that case, buffering has\n" +        "\tno noticeable effect on latency. However, buffering space\n" +        "\tcan retard congestion when the capacity of a node is exceeded\n" +        "\t(and the apparent latency will increase).\n" +        "\tIn practice, under simulated conditions on a single host, it\n" +        "\tis impossible to keep the network path fully utilized if\n" +        "\tpath length is less than twice bandwidth*latency. Any\n" +        "\trealistic network has at least one extra buffer at each node.\n" +        "\tSetting a path length smaller than the bandwidth*latency\n" +        "\tproduct is somewhat artifical. It roughly corresponds to some\n" +        "\tnode along the path delaying packets with no buffer to store\n" +        "\tthem while they wait. In effect it results in the bandwidth\n" +        "\tlowering to match, but makes congestions harder to predict. So\n" +        "\tit is usefull for testing congestion recovery mechanisms.\n";}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -