filequeue.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 366 行

JAVA
366
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.jms.file;import java.io.*;import java.util.logging.*;import java.sql.*;import javax.jms.*;import javax.annotation.*;import javax.webbeans.*;import com.caucho.jms.queue.*;import com.caucho.jms.message.*;import com.caucho.jms.connection.*;import com.caucho.config.ConfigException;import com.caucho.db.*;import com.caucho.util.L10N;import com.caucho.java.*;import com.caucho.vfs.*;/** * A JMS queue backed by a file-based database. * * The URL looks like * <pre> * file:name=my-name;path=file:/var/www/webapps/test/WEB-INFjms * </pre> * * It is configured as: * <pre> * &lt;jms-queue jndi-name="jms/my-name" uri="file:path=WEB-INF/jms"/> * </pre> */public class FileQueue extends AbstractQueue implements Topic{  private static final L10N L = new L10N(FileQueue.class);  private static final Logger log    = Logger.getLogger(FileQueue.class.getName());  private final FileQueueStore _store;  private final Object _queueLock = new Object();  private FileQueueEntry []_head = new FileQueueEntry[10];  private FileQueueEntry []_tail = new FileQueueEntry[10];  @In public FileQueue()  {    _store = new FileQueueStore(_messageFactory);  }  public FileQueue(String name)  {    this();        setName(name);    init();  }  //  // Configuration  //  /**   * Sets the path to the backing database   */  public void setPath(Path path)  {    _store.setPath(path);  }  public Path getPath()  {    return _store.getPath();  }  public void setTablePrefix(String prefix)  {    _store.setTablePrefix(prefix);  }  //  // JMX configuration items  //  /**   * Returns the JMS configuration url.   */  public String getUrl()  {    return "file:name=" + getName() + ";path=" + _store.getPath().getURL();  }  //  // JMX stats  //  public int getQueueSize()  {    synchronized (_queueLock) {      int count = 0;      for (int i = 0; i < _head.length; i++ ){	for (FileQueueEntry entry = _head[i];	     entry != null;	     entry = entry._next) {	  count++;	}      }      return count;    }  }  /**   * Initialize the queue   */  public void init()  {    _store.setName(getName());    _store.init();    _store.receiveStart(this);  }    /**   * Adds a message entry from startup.   */  FileQueueEntry addEntry(long id,			  long leaseTimeout,			  int priority,			  long expire,			  MessageType type)  {    synchronized (_queueLock) {      if (priority < 0)	priority = 0;      else if (priority >= _head.length)	priority = _head.length;            FileQueueEntry entry	= new FileQueueEntry(id, leaseTimeout, priority, expire, type);      entry._prev = _tail[priority];      if (_tail[priority] != null)	_tail[priority]._next = entry;      else	_head[priority] = entry;      _tail[priority] = entry;      return entry;    }  }  /**   * Adds the message to the persistent store.  Called if there are no   * active listeners.   *   * @param msg the message to store   * @param expires the expires time   */  @Override  public void send(JmsSession session,		   MessageImpl msg,		   int priority,		   long expires)  {    synchronized (_queueLock) {      long id = _store.send(msg, priority, expires);      FileQueueEntry entry = addEntry(id, -1, priority, expires, null);      entry.setMessage(msg);    }    notifyMessageAvailable();  }  /**   * Adds the message to the persistent store.  Called if there are no   * active listeners.   *   * @param msg the message to store   * @param expires the expires time   */  public void sendBackup(JmsSession session,			 MessageImpl msg,			 long leaseTimeout,			 int priority,			 long expires)  {    synchronized (_queueLock) {      long id = _store.send(msg, priority, expires);      FileQueueEntry entry	= addEntry(id, leaseTimeout, priority, expires, null);      entry.setMessage(msg);    }  }  /**   * Polls the next message from the store.  If no message is available,   * wait for the timeout.   */  @Override  public MessageImpl receive(boolean isAutoAck)  {    synchronized (_queueLock) {      for (int i = _head.length - 1; i >= 0; i--) {	for (FileQueueEntry entry = _head[i];	     entry != null;	     entry = entry._next) {	  if (! entry.isLease())	    continue;	  	  if (! entry.isRead()) {	    entry.setRead(true);	    MessageImpl msg = entry.getMessage();	    if (msg == null) {	      msg = _store.readMessage(entry.getId(), entry.getType());	      entry.setMessage(msg);	    }	    if (log.isLoggable(Level.FINER))	      log.finer(this + " receive " + msg + " auto-ack=" + isAutoAck);	    if (isAutoAck) {	      removeEntry(entry);	      _store.delete(entry.getId());	    }	    return msg;	  }	}      }      return null;    }  }  /**   * Rollsback the message from the store.   */  @Override  public void rollback(String msgId)  {    synchronized (_queueLock) {      for (int i = _head.length - 1; i >= 0; i--) {	for (FileQueueEntry entry = _head[i];	     entry != null;	     entry = entry._next) {	  MessageImpl msg = entry.getMessage();        	  if (msg != null	      && msgId.equals(msg.getJMSMessageID())	      && entry.isRead()) {	    entry.setRead(false);	    msg.setJMSRedelivered(true);	    return;	  }	}      }    }  }  /**   * Rollsback the message from the store.   */  @Override  public void acknowledge(String msgId)  {    synchronized (_queueLock) {      for (int i = _head.length - 1; i >= 0; i--) {	for (FileQueueEntry entry = _head[i];	     entry != null;	     entry = entry._next) {	  if (entry.getMessage().getJMSMessageID().equals(msgId)	      && entry.isRead()) {	    removeEntry(entry);	    _store.delete(entry.getId());	    return;	  }	}      }    }  }  /**   * Rollsback the message from the store.   */  public void removeMessage(String msgId)  {    synchronized (_queueLock) {      loop:      for (int i = _head.length - 1; i >= 0; i--) {	for (FileQueueEntry entry = _head[i];	     entry != null;	     entry = entry._next) {	  if (entry.getMessage().getJMSMessageID().equals(msgId)) {	    if (log.isLoggable(Level.FINER))	      log.finer(this + " remove " + msgId);	    	    removeEntry(entry);	    break loop;	  }	}      }    }    _store.remove(msgId);  }  private void removeEntry(FileQueueEntry entry)  {    int priority = entry.getPriority();    FileQueueEntry prev = entry._prev;    FileQueueEntry next = entry._next;        if (prev != null)      prev._next = next;    else      _head[priority] = next;    if (next != null)      next._prev = prev;    else      _tail[priority] = prev;  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?