📄 offlinemessagestore.java
字号:
/**
* $RCSfile$
* $Revision: 2911 $
* $Date: 2005-10-03 12:35:52 -0300 (Mon, 03 Oct 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.SequenceManager;
import org.jivesoftware.util.*;
import org.jivesoftware.wildfire.container.BasicModule;
import org.jivesoftware.wildfire.event.UserEventDispatcher;
import org.jivesoftware.wildfire.event.UserEventListener;
import org.jivesoftware.wildfire.user.User;
import org.jivesoftware.wildfire.user.UserManager;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Represents the user's offline message storage. A message store holds messages that were
* sent to the user while they were unavailable. The user can retrieve their messages by
* setting their presence to "available". The messages will then be delivered normally.
* Offline message storage is optional, in which case a null implementation is returned that
* always throws UnauthorizedException when adding messages to the store.
*
* @author Iain Shigeoka
*/
public class OfflineMessageStore extends BasicModule implements UserEventListener {
private static final String INSERT_OFFLINE =
"INSERT INTO jiveOffline (username, messageID, creationDate, messageSize, message) " +
"VALUES (?, ?, ?, ?, ?)";
private static final String LOAD_OFFLINE =
"SELECT message, creationDate FROM jiveOffline WHERE username=?";
private static final String LOAD_OFFLINE_MESSAGE =
"SELECT message FROM jiveOffline WHERE username=? AND creationDate=?";
private static final String SELECT_SIZE_OFFLINE =
"SELECT SUM(messageSize) FROM jiveOffline WHERE username=?";
private static final String SELECT_SIZE_ALL_OFFLINE =
"SELECT SUM(messageSize) FROM jiveOffline";
private static final String DELETE_OFFLINE =
"DELETE FROM jiveOffline WHERE username=?";
private static final String DELETE_OFFLINE_MESSAGE =
"DELETE FROM jiveOffline WHERE username=? AND creationDate=?";
private Cache<String, Integer> sizeCache;
private FastDateFormat dateFormat;
/**
* Pattern to use for detecting invalid XML characters. Invalid XML characters will
* be removed from the stored offline messages.
*/
private Pattern pattern = Pattern.compile("&\\#[\\d]+;");
/**
* Returns the instance of <tt>OfflineMessageStore</tt> being used by the XMPPServer.
*
* @return the instance of <tt>OfflineMessageStore</tt> being used by the XMPPServer.
*/
public static OfflineMessageStore getInstance() {
return XMPPServer.getInstance().getOfflineMessageStore();
}
/**
* Pool of SAX Readers. SAXReader is not thread safe so we need to have a pool of readers.
*/
private BlockingQueue<SAXReader> xmlReaders = new LinkedBlockingQueue<SAXReader>();
/**
* Constructs a new offline message store.
*/
public OfflineMessageStore() {
super("Offline Message Store");
dateFormat = FastDateFormat.getInstance("yyyyMMdd'T'HH:mm:ss", TimeZone.getTimeZone("UTC"));
sizeCache = CacheManager.initializeCache("Offline Message Size", "offlinemessage",
1024 * 100, JiveConstants.HOUR * 12);
}
/**
* Adds a message to this message store. Messages will be stored and made
* available for later delivery.
*
* @param message the message to store.
*/
public void addMessage(Message message) {
if (message == null) {
return;
}
JID recipient = message.getTo();
String username = recipient.getNode();
// If the username is null (such as when an anonymous user), don't store.
if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {
return;
}
else
if (!XMPPServer.getInstance().getServerInfo().getName().equals(recipient.getDomain())) {
// Do not store messages sent to users of remote servers
return;
}
long messageID = SequenceManager.nextID(JiveConstants.OFFLINE);
// Get the message in XML format.
String msgXML = message.getElement().asXML();
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(INSERT_OFFLINE);
pstmt.setString(1, username);
pstmt.setLong(2, messageID);
pstmt.setString(3, StringUtils.dateToMillis(new java.util.Date()));
pstmt.setInt(4, msgXML.length());
pstmt.setString(5, msgXML);
pstmt.executeUpdate();
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
}
// Update the cached size if it exists.
if (sizeCache.containsKey(username)) {
int size = sizeCache.get(username);
size += msgXML.length();
sizeCache.put(username, size);
}
}
/**
* Returns a Collection of all messages in the store for a user.
* Messages may be deleted after being selected from the database depending on
* the delete param.
*
* @param username the username of the user who's messages you'd like to receive.
* @param delete true if the offline messages should be deleted.
* @return An iterator of packets containing all offline messages.
*/
public Collection<OfflineMessage> getMessages(String username, boolean delete) {
List<OfflineMessage> messages = new ArrayList<OfflineMessage>();
Connection con = null;
PreparedStatement pstmt = null;
SAXReader xmlReader = null;
try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(LOAD_OFFLINE);
pstmt.setString(1, username);
ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
String msgXML = rs.getString(1);
Date creationDate = new Date(Long.parseLong(rs.getString(2).trim()));
OfflineMessage message;
try {
message = new OfflineMessage(creationDate,
xmlReader.read(new StringReader(msgXML)).getRootElement());
} catch (DocumentException e) {
// Try again after removing invalid XML chars (e.g. )
Matcher matcher = pattern.matcher(msgXML);
if (matcher.find()) {
msgXML = matcher.replaceAll("");
}
message = new OfflineMessage(creationDate,
xmlReader.read(new StringReader(msgXML)).getRootElement());
}
// Add a delayed delivery (JEP-0091) element to the message.
Element delay = message.addChildElement("x", "jabber:x:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getName());
delay.addAttribute("stamp", dateFormat.format(creationDate));
messages.add(message);
}
rs.close();
// Check if the offline messages loaded should be deleted, and that there are
// messages to delete.
if (delete && !messages.isEmpty()) {
pstmt.close();
pstmt = con.prepareStatement(DELETE_OFFLINE);
pstmt.setString(1, username);
pstmt.executeUpdate();
removeUsernameFromSizeCache(username);
}
}
catch (Exception e) {
Log.error("Error retrieving offline messages of username: " + username, e);
}
finally {
// Return the sax reader to the pool
if (xmlReader != null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -