functionalstreamingtestcomponent.java
来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 188 行
JAVA
188 行
/* * $Id: FunctionalStreamingTestComponent.java 12151 2008-06-24 23:36:43Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.tck.functional;import org.mule.api.MuleEventContext;import org.mule.api.lifecycle.Callable;import org.mule.util.ClassUtils;import org.mule.util.StringMessageUtils;import java.io.IOException;import java.io.InputStream;import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * A service that can be used by streaming functional tests. This service accepts an * EventCallback that can be used to assert the state of the current event. To access the * service when embedded in an (XML) model, make sure that the descriptor sets the * singleton attribute true - see uses in TCP and FTP. * * Note that although this implements the full StreamingService interface, nothing is * written to the output stream - this is intended as a final sink. * * @see org.mule.tck.functional.EventCallback */public class FunctionalStreamingTestComponent implements Callable{ protected transient Log logger = LogFactory.getLog(getClass()); private static AtomicInteger count = new AtomicInteger(0); private int number = count.incrementAndGet(); public static final int STREAM_SAMPLE_SIZE = 4; public static final int STREAM_BUFFER_SIZE = 4096; private EventCallback eventCallback; private String summary = null; private long targetSize = -1; public FunctionalStreamingTestComponent() { logger.debug("creating " + toString()); } public void setEventCallback(EventCallback eventCallback, long targetSize) { logger.debug("setting callback: " + eventCallback + " in " + toString()); this.eventCallback = eventCallback; this.targetSize = targetSize; } public String getSummary() { return summary; } public int getNumber() { return number; } public Object onCall(MuleEventContext context) throws Exception { InputStream in = (InputStream) context.getMessage().getPayload(InputStream.class); try { logger.debug("arrived at " + toString()); byte[] startData = new byte[STREAM_SAMPLE_SIZE]; long startDataSize = 0; byte[] endData = new byte[STREAM_SAMPLE_SIZE]; // ring buffer long endDataSize = 0; long endRingPointer = 0; long streamLength = 0; byte[] buffer = new byte[STREAM_BUFFER_SIZE]; // throw data on the floor, but keep a record of size, start and end values long bytesRead = 0; while (bytesRead >= 0) { bytesRead = read(in, buffer); if (bytesRead > 0) { if (logger.isDebugEnabled()) { logger.debug("read " + bytesRead + " bytes"); } streamLength += bytesRead; long startOfEndBytes = 0; for (long i = 0; startDataSize < STREAM_SAMPLE_SIZE && i < bytesRead; ++i) { startData[(int) startDataSize++] = buffer[(int) i]; ++startOfEndBytes; // skip data included in startData } startOfEndBytes = Math.max(startOfEndBytes, bytesRead - STREAM_SAMPLE_SIZE); for (long i = startOfEndBytes; i < bytesRead; ++i) { ++endDataSize; endData[(int) (endRingPointer++ % STREAM_SAMPLE_SIZE)] = buffer[(int) i]; } if (streamLength >= targetSize) { doCallback(startData, startDataSize, endData, endDataSize, endRingPointer, streamLength, context); } } } in.close(); } catch (Exception e) { in.close(); e.printStackTrace(); if (logger.isDebugEnabled()) { logger.debug(e); } throw e; } return null; } protected int read(InputStream in, byte[] buffer) throws IOException { return in.read(buffer); } private void doCallback(byte[] startData, long startDataSize, byte[] endData, long endDataSize, long endRingPointer, long streamLength, MuleEventContext context) throws Exception { // make a nice summary of the data StringBuffer result = new StringBuffer("Received stream"); result.append("; length: "); result.append(streamLength); result.append("; '"); for (long i = 0; i < startDataSize; ++i) { result.append((char) startData[(int) i]); } long endSize = Math.min(endDataSize, STREAM_SAMPLE_SIZE); if (endSize > 0) { result.append("..."); for (long i = 0; i < endSize; ++i) { result.append((char) endData[(int) ((endRingPointer + i) % STREAM_SAMPLE_SIZE)]); } } result.append("'"); summary = result.toString(); String msg = StringMessageUtils.getBoilerPlate("Message Received in service: " + context.getService().getName() + ". " + summary + "\n callback: " + eventCallback, '*', 80); logger.info(msg); if (eventCallback != null) { eventCallback.eventReceived(context, this); } } public String toString() { return ClassUtils.getSimpleName(getClass()) + "/" + number; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?