📄 streamingcallback.java
字号:
/*------------------------------------------------------------------------------Name: StreamingCallback.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.client;import java.io.ByteArrayInputStream;import java.io.IOException;import java.io.OutputStream;import java.io.PipedInputStream;import java.io.PipedOutputStream;import java.util.ArrayList;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.storage.ClientQueueProperty;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.StorageId;import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;import EDU.oswego.cs.dl.util.concurrent.Mutex;/** * StreamingCallback * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class StreamingCallback implements I_Callback, I_Timeout, I_ConnectionStateListener { /** * * Writer needed since the out stream must be written from a thread which does not * die before the thread which reads the in counterpart. For some "strange" reason * the implementation of the Pipe streams makes a check if the thread which has * made the last write operation on the out stream still is valid. If not, a Dead * End IO Exception is thrown when reading. * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */ class Writer extends Thread { class WriterData extends Mutex { private OutputStream outStrm; private byte[] data; private Throwable exception; public WriterData(OutputStream out, byte[] data) { super(); this.outStrm = out; this.data = data; } } private LinkedQueue channel; public Writer(String name) { super(name); this.channel = new LinkedQueue(); setDaemon(true); start(); } public Writer() { super(); this.channel = new LinkedQueue(); setDaemon(true); start(); } public synchronized void write(OutputStream outStream, byte[] buf) throws InterruptedException, XmlBlasterException { WriterData data = new WriterData(outStream, buf); try { data.acquire(); this.channel.put(data); data.acquire(); // waits until the other thread is finished if (data.exception != null) throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "write: a throwable occured", "", data.exception); } finally { data.release(); } } public synchronized void close(OutputStream outStream) throws InterruptedException, XmlBlasterException { WriterData data = new WriterData(outStream, null); try { data.acquire(); this.channel.put(data); data.acquire(); // waits until the other thread is finished if (data.exception != null) throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "close: a throwable occured", "", data.exception); } finally { data.release(); } } /** * @see java.lang.Thread#run() */ public void run() { while (true) { try { WriterData writerData = (WriterData)this.channel.take(); try { if (writerData.outStrm != null) { if (writerData.data != null) { int bytesLeft = writerData.data.length; int bytesRead = 0; final int MAX_CHUNK_SIZE = 4096; while (bytesLeft > 0) { int toRead = bytesLeft > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : bytesLeft; writerData.outStrm.write(writerData.data, bytesRead, toRead); writerData.outStrm.flush(); bytesRead += toRead; bytesLeft -= toRead; } // writerData.out.write(0); // writerData.out.flush(); // these would block (probably the pipes are not the best in the world // writerData.out.write(writerData.data); // writerData.out.flush(); } else writerData.outStrm.close(); } } catch (Throwable e) { writerData.exception = e; } finally { writerData.release(); } } catch (Throwable e) { if (e.getMessage().indexOf("Pipe closed") < 0) { log.warning("An exception occured when writing to the stream: ' " + e.getMessage()); e.printStackTrace(); } else if (log.isLoggable(Level.FINE)) { log.fine("The pipe was closed, which resulted in an IO Exception. It can happen when the client has returned before reading the complete message"); e.printStackTrace(); } } } } } class ExecutionThread extends Thread { private String cbSessionId_; private UpdateKey updateKey_; private byte[] content_; private UpdateQos updateQos_; public ExecutionThread(String cbSessId, UpdateKey updKey, byte[] content, UpdateQos updQos) { this.cbSessionId_ = cbSessId; this.updateKey_ = updKey; this.content_ = content; this.updateQos_ = updQos; } public void run() { try { ret = updateNewMessage(cbSessionId_, updateKey_, content_, updateQos_); clearQueue(); } catch (Throwable e) { setException(e); e.printStackTrace(); } finally { try { if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } mutex.release(); } } }; private static Logger log = Logger.getLogger(StreamingCallback.class.getName()); public final static String ENTRY_CB_SESSION_ID = "__entryCbSessionId"; private I_StreamingCallback callback; private Global global; private PipedOutputStream out; private PipedInputStream in; private XmlBlasterException ex; private String ret; private String cbSessionId; private Writer writer; /** The time to wait in ms until returning when waiting (if zero or negative inifinite) */ private long waitForChunksTimeout; // private long waitForClientReturnTimeout; private Timeout timer; private Timestamp timestamp; // the key for the timeout timer (can be null) private I_Queue queue; // optional client side queue private boolean useQueue; private boolean initialized; private boolean lastMessageCompleted = true; private final Mutex mutex; private void reset() throws XmlBlasterException { this.out = null; this.in = null; this.ret = null; this.cbSessionId = null; } public StreamingCallback(Global global, I_StreamingCallback callback) throws XmlBlasterException { this(global, callback, 0L, 0L, false); } /** * * @param callback */ public StreamingCallback(Global global, I_StreamingCallback callback, long waitForChunksTimeout, long waitForClientReturnTimeout, boolean useQueue) throws XmlBlasterException { this.callback = callback; this.global = global; this.mutex = new Mutex(); String writerName = StreamingCallback.class.getName() + "-writer"; synchronized(this.global) { this.writer = (Writer)this.global.getObjectEntry(writerName); if (this.writer == null) { this.writer = new Writer(); this.global.addObjectEntry(writerName, this.writer); } } this.waitForChunksTimeout = waitForChunksTimeout; // this.waitForClientReturnTimeout = waitForClientReturnTimeout; if (this.waitForChunksTimeout > 0L) { String timerName = StreamingCallback.class.getName() + "-timer"; synchronized(this.global) { this.timer = (Timeout)this.global.getObjectEntry(timerName); if (this.timer == null) { this.timer = new Timeout(timerName); this.global.addObjectEntry(timerName, this.timer); } } } this.useQueue = useQueue; // TODO latch until connected to avoit early updates } /** * * @return the number of delivered entries from local client update queue. */ public final int sendInitialQueueEntries() throws XmlBlasterException { if (this.queue == null) return 0; ArrayList list = this.queue.peek(-1, -1L); for (int i=0; i < list.size(); i++) { MsgQueuePublishEntry entry = (MsgQueuePublishEntry)list.get(i); MsgKeyData key = entry.getMsgKeyData(); MsgQosData qos =(MsgQosData)entry.getMsgUnit().getQosData(); byte[] cont = entry.getMsgUnit().getContent(); String entryCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null); qos.getClientProperties().remove(ENTRY_CB_SESSION_ID); final boolean isExternal = false; // we don't want to store these entries since already here updateInternal(entryCbSessionId, new UpdateKey(key), cont, new UpdateQos(this.global, qos), isExternal); } this.queue.clear(); return list.size(); } private final void storeEntry(String cbSessId, UpdateKey key, byte[] cont, UpdateQos qos) throws XmlBlasterException {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -