⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliabletest.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        for (int i = 0; i < args.length; i++) {            if (args[i].equals("-quiet")) {                IS_QUIET = true;            } else if (args[i].equals("-sender")) {                IS_SENDER = true;            } else if (args[i].equals("-receiver")) {                IS_SENDER = false;            } else if (args[i].equals("-server")) {                IS_SENDER = false;                IS_SERVER = true;            } else if (args[i].equals("-waitrdv")) {                waitRdv = true;            } else if (args[i].equals("-delay") && i + 1 < args.length) {                String delayStr = args[++i];                try {                    DELAY = Integer.parseInt(delayStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid delay: " + delayStr + USAGE);                    return;                }            } else if (args[i].equals("-iterations") && i + 1 < args.length) {                String iterStr = args[++i];                try {                    ITERATIONS = Integer.parseInt(iterStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid iterations: " + iterStr + USAGE);                    return;                }            } else if (args[i].equals("-drop") && i + 1 < args.length) {                String dropStr = args[++i];                try {                    DROP_MSG = Integer.parseInt(dropStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid drop message: " + dropStr + USAGE);                    return;                }            } else if (args[i].equals("-bw") && i + 1 < args.length) {                String bwStr = args[++i];                try {                    BW_LIMIT = Integer.parseInt(bwStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid bw: " + bwStr + USAGE);                    return;                }            } else if (args[i].equals("-pl") && i + 1 < args.length) {                String plStr = args[++i];                try {                    PIPE_LEN = Integer.parseInt(plStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid pl: " + plStr + USAGE);                    return;                }            } else if (args[i].equals("-lat") && i + 1 < args.length) {                String latStr = args[++i];                try {                    LATENCY = Integer.parseInt(latStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid lat: " + latStr + USAGE);                    return;                }            } else if (args[i].equals("-minload") && i + 1 < args.length) {                String minlStr = args[++i];                try {                    MIN_LOAD = Integer.parseInt(minlStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid minload: " + minlStr + USAGE);                    return;                }            } else if (args[i].equals("-maxload") && i + 1 < args.length) {                String maxlStr = args[++i];                try {                    MAX_LOAD = Integer.parseInt(maxlStr);                } catch (NumberFormatException ex) {                    System.err.println("Invalid maxload: " + maxlStr + USAGE);                    return;                }            } else if (args[i].equals("-password") && i + 1 < args.length) {                PASSWORD = args[++i];            } else if (args[i].equals("-principal") && i + 1 < args.length) {                PRINCIPAL = args[++i];            } else if (args[i].equals("-name") && i + 1 < args.length) {                MSG_PIPE_NAME = args[++i] + "MsgPipe";                ACK_PIPE_NAME = args[i] + "AckPipe";            } else if (args[i].equals("-debug")) {                DEBUG = true;            } else if (args[i].equals("-adapt")) {                ADAPTIVE = true;            } else if (args[i].equals("-help")) {                System.err.println(USAGE);                System.err.println(HELP);                return;            }        }        System.out.println(                (IS_SENDER ? "Sender" : "Receiver") + "\n--------" + "\n quiet:      " + IS_QUIET + "\n delay:      " + DELAY                + "\n iterations: " + ITERATIONS + "\n drop:       " + DROP_MSG + "\n bw:         " + BW_LIMIT + "\n pl:         "                + PIPE_LEN + "\n latency:    " + LATENCY + "\n min load:   " + MIN_LOAD + "\n max load:   " + MAX_LOAD                + "\n adaptive:   " + ADAPTIVE + "\n debug:      " + DEBUG);    }    public void rendezvousEvent(RendezvousEvent event) {        synchronized (rdvConnectLock) {            rdvConnectLock.notifyAll();        }    }    public void test() {        if (IS_SENDER) {            doSender();            longPause();            longPause();            doSender();        } else {            do {                doReceiver();            } while (IS_SERVER);        }    }    private PipeAdvertisement createPipeAdv(String pipeName) {        PipeAdvertisement padv = (PipeAdvertisement)                AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());        String pipeType = PipeService.UnicastType;        String composite = pipeName + pipeType;        byte[] seed = Integer.toHexString(composite.hashCode()).getBytes();        PeerGroupID pgID = netPeerGroup.getPeerGroupID();        PipeID pipeId = IDFactory.newPipeID(pgID, seed);        padv.setName(pipeName);        padv.setPipeID(pipeId);        padv.setType(pipeType);        return padv;    }    private void pause() {        synchronized (this) {            try {                wait(DELAY);            } catch (InterruptedException e) {}        }    }    private void longPause() {        try {            synchronized (this) {                wait(10 * DELAY);            }        } catch (InterruptedException e) {}    }    private void doSender() {        try {            int sequence = 0;            // Create the ros and the outgoing adaptor to go with it before            // the outputpipe exists. The pipe will be set when ready. In the            // meantime, send() would just fail or block. However, in the sender            // we do not start before the pipes are created; we do not need to.            // The same trick is more usefull to the receiver.            outgoing = new OutgoingPipeAdaptorSync(null);            ros = ADAPTIVE                    ? new ReliableOutputStream(outgoing, new AdaptiveFlowControl())                    : new ReliableOutputStream(outgoing, new FixedFlowControl(40));            for (int i = 0; i < ITERATIONS; i++) {                                // if we do not already have it resolved, retry                // to open the output pipe every so often                if (outputPipe == null) {                    pause();                    if ((i % 11) == 0) {                        PipeAdvertisement padv = IS_SENDER ? msgPipeAdv : ackPipeAdv;                        if (padv == null) {                            discoverySvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", MSG_PIPE_NAME, 10, this);                            discoverySvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", ACK_PIPE_NAME, 10, this);                            if (DEBUG) {                                System.out.println("launched remote discovery for " + MSG_PIPE_NAME + " and " + ACK_PIPE_NAME);                            }                            // wait for discovery response to come in                            continue;                        }                        if (DEBUG) {                            System.out.println("re-resolving output pipe " + padv.getName());                        }                        try {                            pipeSvc.createOutputPipe(padv, this);                        } catch (IOException ex) {                            System.err.println(ex.getMessage());                            return;                        }                    }                }                // No need to start before we could supply the pipes; messages                // would go nowhere and/or the other side would not be able to                // resolve the ack pipe.                if (outputPipe == null || inputPipe == null) {                    continue;                }            }            System.out.print("Sending...");            System.out.flush();            for (int i = 0; i <= ITERATIONS; i++) {                Message msg = new Message();                if (i == ITERATIONS) {                    msg.addMessageElement(new StringMessageElement(MESSAGE_TAG, "mclose", null));                } else {                    String messageId = "m" + Integer.toString(nextMessageId++);                    msg.addMessageElement(new StringMessageElement(MESSAGE_TAG, messageId, null));                    // add a random load element                    int index = random.nextInt(loadElements.size());                    byte[] le = (byte[]) loadElements.get(index);                    MessageElement elm = new ByteArrayMessageElement(PAYLOAD_TAG, MIME_BINARY, le, null);                    msg.addMessageElement(elm);                }                msg.addMessageElement(new StringMessageElement(SENT_AT_TAG, Long.toString(System.currentTimeMillis()), null));                try {                    sequence = ros.send(msg);                    // System.out.println(messageId + " " +                     // msg.getByteLength() + "b " +                    // sequence + "seq " +                    // ros.getMaxAck() + "ack");                } catch (Throwable e) {                    e.printStackTrace();                    return;                }            }            System.out.print("closing...");            System.out.flush();            while (ros.getMaxAck() != sequence) {                pause();            }            ros.close();            inputPipe.close();            outputPipe.close();            msgPipeAdv = null;            ackPipeAdv = null;            outputPipe = null;            inputPipe = null;            outgoing = null;            incoming = null;            ros = null;            System.out.println("Done");        } catch (Throwable t) {            t.printStackTrace();        }    }    public void discoveryEvent(DiscoveryEvent event) {        Enumeration ae = event.getResponse().getResponses();        while (ae.hasMoreElements()) {            String str = (String) ae.nextElement();            // create Advertisement from response            Advertisement adv = null;            try {                adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, new StringReader(str));            } catch (IOException ex) {                System.err.println("error parsing discovery response");                System.err.println(ex.getMessage());                continue;            }            if (adv instanceof PipeAdvertisement) {                PipeAdvertisement pipeAdv = (PipeAdvertisement) adv;                String pipeName = pipeAdv.getName();                if (MSG_PIPE_NAME.equals(pipeName)) {                    msgPipeAdv = pipeAdv;                    if (DEBUG) {                        System.out.println("discovered msg pipe: " + pipeName);                    }                    try {                        pipeSvc.createOutputPipe(msgPipeAdv, this);                        if (DEBUG) {                            System.out.println("opened msg pipe for output");                        }                    } catch (IOException ex) {                        System.err.println(ex.getMessage());                    }                } else if (ACK_PIPE_NAME.equals(pipeName)) {                    ackPipeAdv = pipeAdv;                    if (DEBUG) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -