⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliabletest.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                        System.out.println("discovered ack pipe: " + pipeName);                    }                    try {                        inputPipe = pipeSvc.createInputPipe(ackPipeAdv, this);                        if (DEBUG) {                            System.out.println("opened ack pipe for input");                        }                    } catch (IOException ex) {                        System.err.println(ex.getMessage());                    }                }            }        }    }    private void doReceiver() {        try {            if (msgPipeAdv == null) {                msgPipeAdv = createPipeAdv(MSG_PIPE_NAME);                discoverySvc.publish(msgPipeAdv);                discoverySvc.remotePublish(msgPipeAdv);                inputPipe = pipeSvc.createInputPipe(msgPipeAdv, this);            }        } catch (IOException ex) {            fail(ex.getMessage());        }        if (DEBUG) {            System.out.println("published msg pipe: " + msgPipeAdv.getName());            System.out.println("opened msg pipe for input");        }        // We need to give to our input a reference to our output, but,        // obviously, we need to create our input before resolving the output        // pipe, otherwise we do not known when to start trying to resolve.        // In addition, our output must not loose messages while the pipe is        // being resolved: these are acks. All the first packets would remain        // un-acked, which would make for a very slow start.        // Therefore we need a form of output that can be created before        // the pipe. This used to be solved by interposing queues and threads.        // Now, we create the outgoing adaptor without a pipe.        // The pipe will be set whenever it is ready. In the meantime, the        // reliable input stream will block or return false if it is used.        outgoing = new OutgoingPipeAdaptorSync(null);        ris = new ReliableInputStream(outgoing, 0);        try {            if (ackPipeAdv == null) {                ackPipeAdv = createPipeAdv(ACK_PIPE_NAME);                discoverySvc.publish(ackPipeAdv);                discoverySvc.remotePublish(ackPipeAdv);            }            pipeSvc.createOutputPipe(ackPipeAdv, this);                        } catch (IOException ex) {            fail(ex.getMessage());        }        if (DEBUG) {            System.out.println("published ack pipe: " + ackPipeAdv.getName());            System.out.println("opened ack pipe for output");        }        System.out.print("Waiting for sender...");        System.out.flush();        while (!ris.hasNextMessage()) {            pause();        }        System.out.println("Receiving");        for (int i = 0; outputPipe == null;) {            pause();            if ((i++ % 11) == 0) {                if (DEBUG) {                    System.out.println("re-resolving output pipe " + ackPipeAdv.getName());                }                try {                    pipeSvc.createOutputPipe(ackPipeAdv, this);                } catch (IOException ex) {                    System.err.println(ex.getMessage());                }            }        }        long startedAt = 0;        long bytesTransferred = 0;        long nbMsgs = 0;        while (true) {            Message msg = null;            try {                msg = ris.nextMessage(true);            } catch (IOException ioe) {                System.err.println("Failed to obtain next message");                ioe.printStackTrace();                         }            ++nbMsgs;            printMessageForDebug("doReceiver", msg);            String msgId = (msg.getMessageElement(MESSAGE_TAG)).toString();            String sentAt = (msg.getMessageElement(SENT_AT_TAG)).toString();            long size = msg.getByteLength();                        long now = System.currentTimeMillis();            long sent = now;            try {                sent = Long.parseLong(sentAt);            } catch (NumberFormatException nex) {                System.err.println("could not parse msg send time: " + sentAt);                continue;            }            // We start computing averages only after the first message has            // been received.            long throughput = 0;            if (nbMsgs > (2 * ITERATIONS / 3)) {                if (startedAt == 0) {                    startedAt = System.currentTimeMillis();                } else {                    long totalTime = now - startedAt;                    if (totalTime <= 0) {                        // that can't be. Some time has passed.                        totalTime = 1;                    }                    bytesTransferred += size;                    throughput = (((1000 * 8 * bytesTransferred) / totalTime) / 1024);                }            }            long delay = now - sent;            if (msgId.equals("mclose")) {                System.out.println(                        "\nResults" + "\n-------" + "\ntransferred:       " + bytesTransferred + " bytes"                        + "\nthroughput:        " + throughput + " kbps" + "\ncongestion events: " + lostToCongestion);                try {                    ris.close();                } catch (IOException ioe1) {}                ris = null;                outgoing.close();                outgoing = null;                outputPipe = null;                longPause();                break;            }            if (!IS_QUIET) {                System.out.print(msgId + " " + size + "b " + delay + "ms " + throughput + "kbps avg\r");                System.out.flush();            }        }    }    private void printMessageForDebug(String header, Message msg) {        if (DEBUG) {            ElementIterator iter = msg.getMessageElements();            System.out.print(header + " " + IS_SENDER + " (");            while (iter.hasNext()) {                MessageElement el = iter.next();                System.out.print(el.getElementName());                if (iter.hasNext()) {                    System.out.print(", ");                }            }            System.out.println(")");        }    }    public void pipeMsgEvent(PipeMsgEvent inputPipeEvent) {        Message msg = inputPipeEvent.getMessage();        if (msg == null) {            return;        }        printMessageForDebug("pipeMsgEvent", msg);        if (dropMessage()) {            if (DEBUG) {                System.out.println("dropped incoming:" + (IS_SENDER ? "ack" : "msg"));            }            return;        }        if (BW_LIMIT < Integer.MAX_VALUE) {            bwQueueMsg(msg);            return;        }        // We're redirecting either to the ros or to the ris; depending        // on whether we're on one side or the other of the reliable stream.        // The other stream obj is null.        if (ros != null) {            ros.recv(msg);        } else if (ris != null) {            ris.recv(msg);        }    }    public void outputPipeEvent(OutputPipeEvent outputPipeEvent) {        String pid = outputPipeEvent.getPipeID();        // this will happen in the sender        if (outputPipe == null && pid.equals(msgPipeAdv.getPipeID().toString())) {            outputPipe = outputPipeEvent.getOutputPipe();            outgoing.setPipe(outputPipe);            // the next line is not needed in this case,            // since here we register 'this' as an            // PipeMsgListener (Input Pipe Listener) and            // 'manually' redirect the ack messages to            // ros.recv() from pipeMsgEvent            // incoming = new IncomingPipeAdaptor(inputPipe, ros);            if (DEBUG) {                System.out.println("resolved msg output pipe " + outputPipe.getName());            }        }         // this will happen in the receiver        if (outputPipe == null && pid.equals(ackPipeAdv.getPipeID().toString())) {            outputPipe = outputPipeEvent.getOutputPipe();            outgoing.setPipe(outputPipe);            // the next line is not needed in this case,            // since here we register 'this' as an            // PipeMsgListener (Input Pipe Listener) and            // 'manually' redirect the ack messages to            // ros.recv() from pipeMsgEvent            // incoming = new IncomingPipeAdaptor(inputPipe, ris);            if (DEBUG) {                System.out.println("resolved ack output pipe " + outputPipe.getName());            }        }    }    synchronized boolean dropMessage() {        return ((++dropMsgCount) % DROP_MSG) == 0;    }    private static final String USAGE = "\nUsage: ReliableTest <options>" + "\n" + "options:\n"            + "  -help       outputs some usefull advice" + "  -quiet      only output a summary (" + IS_QUIET + ")\n"            + "  -sender     whether to run as sender of messages (" + IS_SENDER + ")\n"            + "  -receiver   whether to run as a receiver of messages (" + !IS_SENDER + ")\n"            + "  -server     whether to run as a permanent receiver of messages (" + IS_SERVER + ")\n"            + "  -waitrdv    wait for a rendezvous connection before starting (not)\n"            + "  -drop       drop every Nth messages (on arrival) (" + DROP_MSG + ")\n"            + "  -bw         simulated bw cap in Kbit/s (on arrival) (" + BW_LIMIT + ")\n"            + "  -pl         simulated pipe length in bytes (only with bw) (" + PIPE_LEN + ")\n"            + "  -lat        simulated latency in ms (only with bw) (" + LATENCY + ")\n"            + "  -minload    smallest of the random payload sizes in bytes (" + MIN_LOAD + ")\n"            + "  -maxload    largest of the random payload sizes in bytes (" + MAX_LOAD + ")\n"            + "  -name       base name for the pipes (ReliableTest)\n"            + "  -debug      whether to turn on debugging in the peer (" + DEBUG + ")\n"            + "  -adapt      Use adaptive flow control (do not)\n" + "  -iterations number of times to send a message ("            + ITERATIONS + ")\n" + "  -delay      Basic delay unit (" + DELAY + ")\n"            + "  -principal  net.jxta.tls.principal property (" + PRINCIPAL + ")\n"            + "  -password   net.jxta.tls.password property (" + PASSWORD + ")\n";    private static final String HELP = "Some options serve to simulate particular network conditions.\n"            + "These conditions are simulated on the destination side of a\n"            + "link. As a result, the options given to the receiver will\n"            + "control the behaviour of the data channel, while the options\n"            + "to the sender will control the behaviour of the ack channel.\n" + "\n" + "These options are the following:\n"            + "-bw, as in \"bandwidth\":\n" + "\tcontrols the time it takes for the slowest segment of the path\n"            + "\tto go from begining the transmition of one bit to being ready\n"            + "\tto transmit the next one. It is expressed in Kbit per second.\n" + "-lat, as in \"latency\":\n"            + "\tcontrols the time it takes for a bit to traverse the network in\n"            + "\taddition to the time caused by the bandwidth. In real life, the\n"            + "\tprincipal contributors to this time would be signal travel time\n"            + "\tper the laws of physics, and packet processing time at each\n"            + "\tnode along the path. The essential characteristics of latency,\n"            + "\tis that over one given packet's latency time, a number of other\n" + "\tpackets can begin traveling as well.\n"            + "-pl, as in \"path length\":\n" + "\tcontrols the actual amount of data that can be traveling along\n"            + "\tthe network path (in bytes). In real life, this is the\n"            + "\tproduct bandwidth * latency + some buffering, but the three\n"            + "\tvalues can be chosen differently in order to simulate extreme\n"            + "\tconditions. For example, most often pl would be larger than\n"            + "\tthe bandwidth*latency product to reflect buffering capacity at\n"            + "\tvarious nodes along the path. Under non-congested conditions\n"            + "\tthese buffers are used to store at most one pending packet\n"            + "\ton each node, which ensures back-to-back link utilization\n"            + "\tdespite statistical variations. In that case, buffering has\n"            + "\tno noticeable effect on latency. However, buffering space\n"            + "\tcan retard congestion when the capacity of a node is exceeded\n"            + "\t(and the apparent latency will increase).\n" + "\tIn practice, under simulated conditions on a single host, it\n"            + "\tis impossible to keep the network path fully utilized if\n"            + "\tpath length is less than twice bandwidth*latency. Any\n"            + "\trealistic network has at least one extra buffer at each node.\n"            + "\tSetting a path length smaller than the bandwidth*latency\n"            + "\tproduct is somewhat artifical. It roughly corresponds to some\n"            + "\tnode along the path delaying packets with no buffer to store\n"            + "\tthem while they wait. In effect it results in the bandwidth\n"            + "\tlowering to match, but makes congestions harder to predict. So\n"            + "\tit is usefull for testing congestion recovery mechanisms.\n";}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -