⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamingcallback.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      StreamingCallback.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.client;import java.io.ByteArrayInputStream;import java.io.IOException;import java.io.OutputStream;import java.io.PipedInputStream;import java.io.PipedOutputStream;import java.util.ArrayList;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.storage.ClientQueueProperty;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.StorageId;import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;import EDU.oswego.cs.dl.util.concurrent.Mutex;/** * StreamingCallback * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class StreamingCallback implements I_Callback, I_Timeout, I_ConnectionStateListener {   /**    *     * Writer needed since the out stream must be written from a thread which does not    * die before the thread which reads the in counterpart. For some "strange" reason    * the implementation of the Pipe streams makes a check if the thread which has     * made the last write operation on the out stream still is valid. If not, a Dead    * End IO Exception is thrown when reading.    *     * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>    */   class Writer extends Thread {      class WriterData extends Mutex {         private OutputStream outStrm;         private byte[] data;         private Throwable exception;                  public WriterData(OutputStream out, byte[] data) {            super();            this.outStrm = out;            this.data = data;         }      }      private LinkedQueue channel;            public Writer(String name) {         super(name);         this.channel = new LinkedQueue();         setDaemon(true);         start();      }      public Writer() {         super();         this.channel = new LinkedQueue();         setDaemon(true);         start();      }            public synchronized void write(OutputStream outStream, byte[] buf) throws InterruptedException, XmlBlasterException {         WriterData data = new WriterData(outStream, buf);         try {            data.acquire();            this.channel.put(data);            data.acquire(); // waits until the other thread is finished            if (data.exception != null)               throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "write: a throwable occured", "", data.exception);         }         finally {            data.release();         }      }      public synchronized void close(OutputStream outStream) throws InterruptedException, XmlBlasterException {         WriterData data = new WriterData(outStream, null);         try {            data.acquire();            this.channel.put(data);            data.acquire(); // waits until the other thread is finished            if (data.exception != null)               throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "close: a throwable occured", "", data.exception);         }         finally {            data.release();         }      }      /**       * @see java.lang.Thread#run()       */      public void run() {         while (true) {            try {               WriterData writerData = (WriterData)this.channel.take();               try {                  if (writerData.outStrm != null) {                     if (writerData.data != null) {                        int bytesLeft = writerData.data.length;                        int bytesRead = 0;                        final int MAX_CHUNK_SIZE = 4096;                        while (bytesLeft > 0) {                           int toRead = bytesLeft > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : bytesLeft;                           writerData.outStrm.write(writerData.data, bytesRead, toRead);                           writerData.outStrm.flush();                           bytesRead += toRead;                           bytesLeft -= toRead;                        }                        // writerData.out.write(0);                        // writerData.out.flush();                                                // these would block (probably the pipes are not the best in the world                        // writerData.out.write(writerData.data);                        // writerData.out.flush();                     }                     else                        writerData.outStrm.close();                  }               }               catch (Throwable e) {                  writerData.exception = e;               }               finally {                  writerData.release();               }            }            catch (Throwable e) {               if (e.getMessage().indexOf("Pipe closed") < 0) {                  log.warning("An exception occured when writing to the stream: ' " + e.getMessage());                  e.printStackTrace();               }               else if (log.isLoggable(Level.FINE)) {                  log.fine("The pipe was closed, which resulted in an IO Exception. It can happen when the client has returned before reading the complete message");                  e.printStackTrace();               }            }         }      }   }         class ExecutionThread extends Thread {            private String cbSessionId_;      private UpdateKey updateKey_;      private byte[] content_;      private UpdateQos updateQos_;            public ExecutionThread(String cbSessId, UpdateKey updKey, byte[] content, UpdateQos updQos) {         this.cbSessionId_ = cbSessId;         this.updateKey_ = updKey;         this.content_ = content;         this.updateQos_ = updQos;               }            public void run() {         try {            ret = updateNewMessage(cbSessionId_, updateKey_, content_, updateQos_);            clearQueue();         }         catch (Throwable e) {            setException(e);            e.printStackTrace();         }         finally {            try {               if (in != null)                  in.close();            }            catch (IOException e) {               e.printStackTrace();            }            mutex.release();         }      }   };   private static Logger log = Logger.getLogger(StreamingCallback.class.getName());   public final static String ENTRY_CB_SESSION_ID = "__entryCbSessionId";      private I_StreamingCallback callback;   private Global global;   private PipedOutputStream out;   private PipedInputStream in;   private XmlBlasterException ex;   private String ret;   private String cbSessionId;   private Writer writer;   /** The time to wait in ms until returning when waiting (if zero or negative inifinite) */   private long waitForChunksTimeout;   // private long waitForClientReturnTimeout;   private Timeout timer;   private Timestamp timestamp; // the key for the timeout timer (can be null)   private I_Queue queue; // optional client side queue   private boolean useQueue;   private boolean initialized;   private boolean lastMessageCompleted = true;   private final Mutex mutex;      private void reset() throws XmlBlasterException {      this.out = null;      this.in = null;      this.ret = null;      this.cbSessionId = null;   }      public StreamingCallback(Global global, I_StreamingCallback callback) throws XmlBlasterException {      this(global, callback, 0L, 0L, false);   }      /**    *     * @param callback    */   public StreamingCallback(Global global, I_StreamingCallback callback, long waitForChunksTimeout, long waitForClientReturnTimeout, boolean useQueue)       throws XmlBlasterException {      this.callback = callback;      this.global = global;      this.mutex = new Mutex();      String writerName = StreamingCallback.class.getName() + "-writer";      synchronized(this.global) {         this.writer = (Writer)this.global.getObjectEntry(writerName);         if (this.writer == null) {            this.writer = new Writer();            this.global.addObjectEntry(writerName, this.writer);         }      }      this.waitForChunksTimeout = waitForChunksTimeout;      // this.waitForClientReturnTimeout = waitForClientReturnTimeout;      if (this.waitForChunksTimeout > 0L) {         String timerName = StreamingCallback.class.getName() + "-timer";         synchronized(this.global) {            this.timer = (Timeout)this.global.getObjectEntry(timerName);            if (this.timer == null) {               this.timer = new Timeout(timerName);               this.global.addObjectEntry(timerName, this.timer);            }         }      }      this.useQueue = useQueue;      // TODO latch until connected to avoit early updates   }      /**    *     * @return the number of delivered entries from local client update queue.    */   public final int sendInitialQueueEntries() throws XmlBlasterException {      if (this.queue == null)         return 0;      ArrayList list = this.queue.peek(-1, -1L);      for (int i=0; i < list.size(); i++) {         MsgQueuePublishEntry entry = (MsgQueuePublishEntry)list.get(i);         MsgKeyData key = entry.getMsgKeyData();         MsgQosData qos =(MsgQosData)entry.getMsgUnit().getQosData();         byte[] cont = entry.getMsgUnit().getContent();         String entryCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);         qos.getClientProperties().remove(ENTRY_CB_SESSION_ID);         final boolean isExternal = false; // we don't want to store these entries since already here         updateInternal(entryCbSessionId, new UpdateKey(key), cont, new UpdateQos(this.global, qos), isExternal);      }      this.queue.clear();      return list.size();   }      private final void storeEntry(String cbSessId, UpdateKey key, byte[] cont, UpdateQos qos) throws XmlBlasterException {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -