⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstracttrafficcontroltest.java

📁 apache 的一个socket框架
💻 JAVA
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at *   *    http://www.apache.org/licenses/LICENSE-2.0 *   *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License.  *   */package org.apache.mina.transport;import java.net.SocketAddress;import junit.framework.TestCase;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.ConnectFuture;import org.apache.mina.common.IoAcceptor;import org.apache.mina.common.IoHandler;import org.apache.mina.common.IoHandlerAdapter;import org.apache.mina.common.IoSession;import org.apache.mina.common.TransportType;import org.apache.mina.util.AvailablePortFinder;/** * Abstract base class for testing suspending and resuming reads and * writes. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */public abstract class AbstractTrafficControlTest extends TestCase {    protected int port = 0;    protected IoAcceptor acceptor;    protected TransportType transportType;    public AbstractTrafficControlTest(IoAcceptor acceptor) {        this.acceptor = acceptor;    }    protected void setUp() throws Exception {        super.setUp();        port = AvailablePortFinder.getNextAvailable();        acceptor.bind(createServerSocketAddress(port), new ServerIoHandler());    }    protected void tearDown() throws Exception {        super.tearDown();        acceptor.unbind(createServerSocketAddress(port));    }    protected abstract ConnectFuture connect(int port, IoHandler handler)            throws Exception;    protected abstract SocketAddress createServerSocketAddress(int port);    public void testSuspendResumeReadWrite() throws Exception {        ConnectFuture future = connect(port, new ClientIoHandler());        future.join();        IoSession session = future.getSession();        // We wait for the sessionCreated() event is fired becayse we cannot guarentee that        // it is invoked already.        while (session.getAttribute("lock") == null) {            Thread.yield();        }        Object lock = session.getAttribute("lock");        synchronized (lock) {            write(session, "1");            assertEquals('1', read(session));            assertEquals("1", getReceived(session));            assertEquals("1", getSent(session));            session.suspendRead();            write(session, "2");            assertFalse(canRead(session));            assertEquals("1", getReceived(session));            assertEquals("12", getSent(session));            session.suspendWrite();            write(session, "3");            assertFalse(canRead(session));            assertEquals("1", getReceived(session));            assertEquals("12", getSent(session));            session.resumeRead();            write(session, "4");            assertEquals('2', read(session));            assertEquals("12", getReceived(session));            assertEquals("12", getSent(session));            session.resumeWrite();            assertEquals('3', read(session));            assertEquals('4', read(session));            write(session, "5");            assertEquals('5', read(session));            assertEquals("12345", getReceived(session));            assertEquals("12345", getSent(session));            session.suspendWrite();            write(session, "6");            assertFalse(canRead(session));            assertEquals("12345", getReceived(session));            assertEquals("12345", getSent(session));            session.suspendRead();            session.resumeWrite();            write(session, "7");            assertFalse(canRead(session));            assertEquals("12345", getReceived(session));            assertEquals("1234567", getSent(session));            session.resumeRead();            assertEquals('6', read(session));            assertEquals('7', read(session));            assertEquals("1234567", getReceived(session));            assertEquals("1234567", getSent(session));        }        session.close().join();    }    private void write(IoSession session, String s) throws Exception {        session.write(ByteBuffer.wrap(s.getBytes("ASCII")));    }    private int read(IoSession session) throws Exception {        int pos = ((Integer) session.getAttribute("pos")).intValue();        for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {            Object lock = session.getAttribute("lock");            lock.wait(200);        }        session.setAttribute("pos", new Integer(pos + 1));        return getReceived(session).charAt(pos);    }    private boolean canRead(IoSession session) throws Exception {        int pos = ((Integer) session.getAttribute("pos")).intValue();        Object lock = session.getAttribute("lock");        lock.wait(250);        String received = getReceived(session);        return pos < received.length();    }    private String getReceived(IoSession session) throws Exception {        return session.getAttribute("received").toString();    }    private String getSent(IoSession session) throws Exception {        return session.getAttribute("sent").toString();    }    public static class ClientIoHandler extends IoHandlerAdapter {        public void sessionCreated(IoSession session) throws Exception {            super.sessionCreated(session);            session.setAttribute("pos", new Integer(0));            session.setAttribute("received", new StringBuffer());            session.setAttribute("sent", new StringBuffer());            session.setAttribute("lock", new Object());        }        public void messageReceived(IoSession session, Object message)                throws Exception {            ByteBuffer buffer = (ByteBuffer) message;            byte[] data = new byte[buffer.remaining()];            buffer.get(data);            Object lock = session.getAttribute("lock");            synchronized (lock) {                StringBuffer sb = (StringBuffer) session                        .getAttribute("received");                sb.append(new String(data, "ASCII"));                lock.notifyAll();            }        }        public void messageSent(IoSession session, Object message)                throws Exception {            ByteBuffer buffer = (ByteBuffer) message;            buffer.rewind();            byte[] data = new byte[buffer.remaining()];            buffer.get(data);            StringBuffer sb = (StringBuffer) session.getAttribute("sent");            sb.append(new String(data, "ASCII"));        }    }    private static class ServerIoHandler extends IoHandlerAdapter {        public void messageReceived(IoSession session, Object message)                throws Exception {            // Just echo the received bytes.            ByteBuffer rb = (ByteBuffer) message;            ByteBuffer wb = ByteBuffer.allocate(rb.remaining());            wb.put(rb);            wb.flip();            session.write(wb);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -