📄 abstractstreamwritefiltertest.java
字号:
EasyMock.replay(nextFilter); filter.messageSent(nextFilter, session, new DefaultWriteRequest( new Object())); assertEquals(0, queue.size()); /* * Verify. */ EasyMock.verify(nextFilter); } /** * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the * specified size. */ public void testSetWriteBufferSize() { AbstractStreamWriteFilter<M> filter = createFilter(); try { filter.setWriteBufferSize(0); fail("0 writeBuferSize specified. IllegalArgumentException expected."); } catch (IllegalArgumentException iae) { // Pass, exception was thrown } try { filter.setWriteBufferSize(-100); fail("Negative writeBuferSize specified. IllegalArgumentException expected."); } catch (IllegalArgumentException iae) { // Pass, exception was thrown } filter.setWriteBufferSize(1); assertEquals(1, filter.getWriteBufferSize()); filter.setWriteBufferSize(1024); assertEquals(1024, filter.getWriteBufferSize()); } public void testWriteUsingSocketTransport() throws Exception { NioSocketAcceptor acceptor = new NioSocketAcceptor(); acceptor.setReuseAddress(true); SocketAddress address = new InetSocketAddress("localhost", AvailablePortFinder.getNextAvailable()); NioSocketConnector connector = new NioSocketConnector(); // Generate 4MB of random data byte[] data = new byte[4 * 1024 * 1024]; new Random().nextBytes(data); byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data); M message = createMessage(data); SenderHandler sender = new SenderHandler(message); ReceiverHandler receiver = new ReceiverHandler(data.length); acceptor.setHandler(sender); connector.setHandler(receiver); acceptor.bind(address); connector.connect(address); sender.latch.await(); receiver.latch.await(); acceptor.dispose(); assertEquals(data.length, receiver.bytesRead); byte[] actualMd5 = receiver.digest.digest(); assertEquals(expectedMd5.length, actualMd5.length); for (int i = 0; i < expectedMd5.length; i++) { assertEquals(expectedMd5[i], actualMd5[i]); } } private class SenderHandler extends IoHandlerAdapter { final CountDownLatch latch = new CountDownLatch(1); private M message; StreamWriteFilter streamWriteFilter = new StreamWriteFilter(); private SenderHandler(M message) { this.message = message; } @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); session.getFilterChain().addLast("codec", streamWriteFilter); } @Override public void sessionOpened(IoSession session) throws Exception { session.write(message); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { latch.countDown(); } @Override public void sessionClosed(IoSession session) throws Exception { latch.countDown(); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { latch.countDown(); } @Override public void messageSent(IoSession session, Object message) throws Exception { if (message == this.message) { latch.countDown(); } } } private static class ReceiverHandler extends IoHandlerAdapter { final CountDownLatch latch = new CountDownLatch(1); long bytesRead = 0; long size = 0; MessageDigest digest; private ReceiverHandler(long size) throws Exception { this.size = size; digest = MessageDigest.getInstance("MD5"); } @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { latch.countDown(); } @Override public void sessionClosed(IoSession session) throws Exception { latch.countDown(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { IoBuffer buf = (IoBuffer) message; while (buf.hasRemaining()) { digest.update(buf.get()); bytesRead++; } if (bytesRead >= size) { session.close(true); } } } public static WriteRequest eqWriteRequest(WriteRequest expected) { EasyMock.reportMatcher(new WriteRequestMatcher(expected)); return null; } public static class WriteRequestMatcher implements IArgumentMatcher { private final WriteRequest expected; public WriteRequestMatcher(WriteRequest expected) { this.expected = expected; } public boolean matches(Object actual) { if (actual instanceof WriteRequest) { WriteRequest w2 = (WriteRequest) actual; return expected.getMessage().equals(w2.getMessage()) && expected.getFuture().isWritten() == w2.getFuture() .isWritten(); } return false; } public void appendTo(StringBuffer buffer) { buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'"); } } private static class DummyWriteFuture implements WriteFuture { private boolean written; public boolean isWritten() { return written; } public void setWritten() { this.written = true; } public IoSession getSession() { return null; } public Object getLock() { return this; } public void join() { } public boolean join(long timeoutInMillis) { return true; } public boolean isDone() { return true; } public WriteFuture addListener(IoFutureListener<?> listener) { return this; } public WriteFuture removeListener(IoFutureListener<?> listener) { return this; } public WriteFuture await() throws InterruptedException { return this; } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return true; } public boolean await(long timeoutMillis) throws InterruptedException { return true; } public WriteFuture awaitUninterruptibly() { return this; } public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { return true; } public boolean awaitUninterruptibly(long timeoutMillis) { return true; } public Throwable getException() { return null; } public void setException(Throwable cause) { throw new IllegalStateException(); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -