functionalstreamingtestcomponent.java

来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 188 行

JAVA
188
字号
/* * $Id: FunctionalStreamingTestComponent.java 12151 2008-06-24 23:36:43Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.tck.functional;import org.mule.api.MuleEventContext;import org.mule.api.lifecycle.Callable;import org.mule.util.ClassUtils;import org.mule.util.StringMessageUtils;import java.io.IOException;import java.io.InputStream;import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * A service that can be used by streaming functional tests. This service accepts an * EventCallback that can be used to assert the state of the current event.  To access the * service when embedded in an (XML) model, make sure that the descriptor sets the * singleton attribute true - see uses in TCP and FTP. * * Note that although this implements the full StreamingService interface, nothing is * written to the output stream - this is intended as a final sink. * * @see org.mule.tck.functional.EventCallback */public class FunctionalStreamingTestComponent implements Callable{    protected transient Log logger = LogFactory.getLog(getClass());    private static AtomicInteger count = new AtomicInteger(0);    private int number = count.incrementAndGet();    public static final int STREAM_SAMPLE_SIZE = 4;    public static final int STREAM_BUFFER_SIZE = 4096;    private EventCallback eventCallback;    private String summary = null;    private long targetSize = -1;    public FunctionalStreamingTestComponent()    {        logger.debug("creating " + toString());    }    public void setEventCallback(EventCallback eventCallback, long targetSize)    {        logger.debug("setting callback: " + eventCallback + " in " + toString());        this.eventCallback = eventCallback;        this.targetSize = targetSize;    }    public String getSummary()    {        return summary;    }     public int getNumber()    {        return number;    }    public Object onCall(MuleEventContext context) throws Exception    {        InputStream in = (InputStream) context.getMessage().getPayload(InputStream.class);        try        {            logger.debug("arrived at " + toString());            byte[] startData = new byte[STREAM_SAMPLE_SIZE];            long startDataSize = 0;            byte[] endData = new byte[STREAM_SAMPLE_SIZE]; // ring buffer            long endDataSize = 0;            long endRingPointer = 0;            long streamLength = 0;            byte[] buffer = new byte[STREAM_BUFFER_SIZE];            // throw data on the floor, but keep a record of size, start and end values            long bytesRead = 0;            while (bytesRead >= 0)            {                bytesRead = read(in, buffer);                if (bytesRead > 0)                {                    if (logger.isDebugEnabled())                    {                        logger.debug("read " + bytesRead + " bytes");                    }                                        streamLength += bytesRead;                    long startOfEndBytes = 0;                    for (long i = 0; startDataSize < STREAM_SAMPLE_SIZE && i < bytesRead; ++i)                    {                        startData[(int) startDataSize++] = buffer[(int) i];                        ++startOfEndBytes; // skip data included in startData                    }                    startOfEndBytes = Math.max(startOfEndBytes, bytesRead - STREAM_SAMPLE_SIZE);                    for (long i = startOfEndBytes; i < bytesRead; ++i)                    {                        ++endDataSize;                        endData[(int) (endRingPointer++ % STREAM_SAMPLE_SIZE)] = buffer[(int) i];                    }                    if (streamLength >= targetSize)                    {                        doCallback(startData, startDataSize,                                endData, endDataSize, endRingPointer,                                streamLength, context);                    }                }            }            in.close();        }        catch (Exception e)        {            in.close();                        e.printStackTrace();            if (logger.isDebugEnabled())            {                logger.debug(e);            }            throw e;        }                return null;    }    protected int read(InputStream in, byte[] buffer) throws IOException    {        return in.read(buffer);    }    private void doCallback(byte[] startData, long startDataSize,                            byte[] endData, long endDataSize, long endRingPointer,                            long streamLength, MuleEventContext context) throws Exception    {        // make a nice summary of the data        StringBuffer result = new StringBuffer("Received stream");        result.append("; length: ");        result.append(streamLength);        result.append("; '");        for (long i = 0; i < startDataSize; ++i)        {            result.append((char) startData[(int) i]);        }        long endSize = Math.min(endDataSize, STREAM_SAMPLE_SIZE);        if (endSize > 0)        {            result.append("...");            for (long i = 0; i < endSize; ++i)            {                result.append((char) endData[(int) ((endRingPointer + i) % STREAM_SAMPLE_SIZE)]);            }        }        result.append("'");        summary = result.toString();        String msg = StringMessageUtils.getBoilerPlate("Message Received in service: "                + context.getService().getName() + ". " + summary                + "\n callback: " + eventCallback,                '*', 80);        logger.info(msg);        if (eventCallback != null)        {            eventCallback.eventReceived(context, this);        }    }    public String toString()    {        return ClassUtils.getSimpleName(getClass()) + "/" + number;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?