📄 jdbcmailrepository.java
字号:
Map sqlParameters = new HashMap(); if (tableName != null) { sqlParameters.put("table", tableName); } if (repositoryName != null) { sqlParameters.put("repository", repositoryName); } sqlQueries = new SqlResources(); sqlQueries.init(sqlFile, this.getClass().getName(), conn, sqlParameters); // Check if the required table exists. If not, create it. DatabaseMetaData dbMetaData = conn.getMetaData(); // Need to ask in the case that identifiers are stored, ask the DatabaseMetaInfo. // Try UPPER, lower, and MixedCase, to see if the table is there. if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) { // Users table doesn't exist - create it. createStatement = conn.prepareStatement(sqlQueries.getSqlString("createTable", true)); createStatement.execute(); if (getLogger().isInfoEnabled()) { logBuffer = new StringBuffer(64) .append("JdbcMailRepository: Created table '") .append(tableName) .append("'."); getLogger().info(logBuffer.toString()); } } checkJdbcAttributesSupport(dbMetaData); } finally { theJDBCUtil.closeJDBCStatement(createStatement); theJDBCUtil.closeJDBCConnection(conn); } } /** Checks whether support for JDBC Mail atributes is activated for this repository * and if everything is consistent. * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL" * statements in sqlResources and for a table column named "message_attributes". * * @param dbMetaData the database metadata to be used to look up the column * @throws SQLException if a fatal situation is met */ protected void checkJdbcAttributesSupport(DatabaseMetaData dbMetaData) throws SQLException { String attributesColumnName = "message_attributes"; boolean hasUpdateMessageAttributesSQL = false; boolean hasRetrieveMessageAttributesSQL = false; boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(dbMetaData, tableName, attributesColumnName); StringBuffer logBuffer = new StringBuffer(64) .append("JdbcMailRepository '" + repositoryName + ", table '" + tableName + "': "); //Determine whether attributes are used and available for storing //Do we have updateMessageAttributesSQL? String updateMessageAttrSql = sqlQueries.getSqlString("updateMessageAttributesSQL", false); if (updateMessageAttrSql!=null) { hasUpdateMessageAttributesSQL = true; } //Determine whether attributes are used and retrieve them //Do we have retrieveAttributesSQL? String retrieveMessageAttrSql = sqlQueries.getSqlString("retrieveMessageAttributesSQL", false); if (retrieveMessageAttrSql!=null) { hasRetrieveMessageAttributesSQL = true; } if (hasUpdateMessageAttributesSQL && !hasRetrieveMessageAttributesSQL) { logBuffer.append("JDBC Mail Attributes support was activated for update but not for retrieval" + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'" + "in table '" + tableName + "')."); getLogger().fatalError(logBuffer.toString()); throw new SQLException(logBuffer.toString()); } if (!hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) { logBuffer.append("JDBC Mail Attributes support was activated for retrieval but not for update" + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'" + "in table '" + tableName + "'."); getLogger().fatalError(logBuffer.toString()); throw new SQLException(logBuffer.toString()); } if (!hasMessageAttributesColumn && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL) ) { logBuffer.append("JDBC Mail Attributes support was activated but column '" + attributesColumnName + "' is missing in table '" + tableName + "'."); getLogger().fatalError(logBuffer.toString()); throw new SQLException(logBuffer.toString()); } if (hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) { jdbcMailAttributesReady = true; if (getLogger().isInfoEnabled()) { logBuffer.append("JDBC Mail Attributes support ready."); getLogger().info(logBuffer.toString()); } } else { jdbcMailAttributesReady = false; logBuffer.append("JDBC Mail Attributes support not activated. " + "Missing both 'updateMessageAttributesSQL' " + "and 'retrieveMessageAttributesSQL' " + "statements for table '" + tableName + "' in sqlResources.xml. " + "Will not persist in the repository '" + repositoryName + "'."); getLogger().warn(logBuffer.toString()); } } /** * Releases a lock on a message identified by a key * * @param key the key of the message to be unlocked * * @return true if successfully released the lock, false otherwise */ public boolean unlock(String key) { if (lock.unlock(key)) { if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { StringBuffer debugBuffer = new StringBuffer(256) .append("Unlocked ") .append(key) .append(" for ") .append(Thread.currentThread().getName()) .append(" @ ") .append(new java.util.Date(System.currentTimeMillis())); getLogger().debug(debugBuffer.toString()); } return true; } else { return false; } } /** * Obtains a lock on a message identified by a key * * @param key the key of the message to be locked * * @return true if successfully obtained the lock, false otherwise */ public boolean lock(String key) { if (lock.lock(key)) { if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { StringBuffer debugBuffer = new StringBuffer(256) .append("Locked ") .append(key) .append(" for ") .append(Thread.currentThread().getName()) .append(" @ ") .append(new java.util.Date(System.currentTimeMillis())); getLogger().debug(debugBuffer.toString()); } return true; } else { return false; } } /** * Store this message to the database. Optionally stores the message * body to the filesystem and only writes the headers to the database. */ public void store(Mail mc) throws MessagingException { Connection conn = null; boolean wasLocked = true; String key = mc.getName(); try { synchronized(this) { wasLocked = lock.isLocked(key); if (!wasLocked) { //If it wasn't locked, we want a lock during the store lock(key); } } conn = datasource.getConnection(); //Need to determine whether need to insert this record, or update it. //Begin a transaction conn.setAutoCommit(false); PreparedStatement checkMessageExists = null; ResultSet rsExists = null; boolean exists = false; try { checkMessageExists = conn.prepareStatement(sqlQueries.getSqlString("checkMessageExistsSQL", true)); checkMessageExists.setString(1, mc.getName()); checkMessageExists.setString(2, repositoryName); rsExists = checkMessageExists.executeQuery(); exists = rsExists.next() && rsExists.getInt(1) > 0; } finally { theJDBCUtil.closeJDBCResultSet(rsExists); theJDBCUtil.closeJDBCStatement(checkMessageExists); } if (exists) { //Update the existing record PreparedStatement updateMessage = null; try { updateMessage = conn.prepareStatement(sqlQueries.getSqlString("updateMessageSQL", true)); updateMessage.setString(1, mc.getState()); updateMessage.setString(2, mc.getErrorMessage()); if (mc.getSender() == null) { updateMessage.setNull(3, java.sql.Types.VARCHAR); } else { updateMessage.setString(3, mc.getSender().toString()); } StringBuffer recipients = new StringBuffer(); for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) { recipients.append(i.next().toString()); if (i.hasNext()) { recipients.append("\r\n"); } } updateMessage.setString(4, recipients.toString()); updateMessage.setString(5, mc.getRemoteHost()); updateMessage.setString(6, mc.getRemoteAddr()); updateMessage.setTimestamp(7, new java.sql.Timestamp(mc.getLastUpdated().getTime())); updateMessage.setString(8, mc.getName()); updateMessage.setString(9, repositoryName); updateMessage.execute(); } finally { Statement localUpdateMessage = updateMessage; // Clear reference to statement updateMessage = null; theJDBCUtil.closeJDBCStatement(localUpdateMessage); } //Determine whether attributes are used and available for storing if (jdbcMailAttributesReady && mc.hasAttributes()) { String updateMessageAttrSql = sqlQueries.getSqlString("updateMessageAttributesSQL", false); PreparedStatement updateMessageAttr = null; try { updateMessageAttr = conn.prepareStatement(updateMessageAttrSql); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); try { if (mc instanceof MailImpl) { oos.writeObject(((MailImpl)mc).getAttributesRaw()); } else { HashMap temp = new HashMap(); for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) { String hashKey = (String) i.next(); temp.put(hashKey,mc.getAttribute(hashKey)); } oos.writeObject(temp); } oos.flush(); ByteArrayInputStream attrInputStream = new ByteArrayInputStream(baos.toByteArray()); updateMessageAttr.setBinaryStream(1, attrInputStream, baos.size()); } finally { try { if (oos != null) { oos.close(); } } catch (IOException ioe) { getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe); } } updateMessageAttr.setString(2, mc.getName()); updateMessageAttr.setString(3, repositoryName); updateMessageAttr.execute(); } catch (SQLException sqle) { getLogger().info("JDBCMailRepository: Trying to update mail attributes failed.",sqle); } finally { theJDBCUtil.closeJDBCStatement(updateMessageAttr); } } //Determine whether the message body has changed, and possibly avoid // updating the database. MimeMessage messageBody = mc.getMessage(); boolean saveBody = false; // if the message is a CopyOnWrite proxy we check the modified wrapped object. if (messageBody instanceof MimeMessageCopyOnWriteProxy) { MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody; messageBody = messageCow.getWrappedMessage(); } if (messageBody instanceof MimeMessageWrapper) { MimeMessageWrapper message = (MimeMessageWrapper)messageBody; saveBody = message.isModified(); } else { saveBody = true; } if (saveBody) { PreparedStatement updateMessageBody = conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL", true)); try { MessageInputStream is = new MessageInputStream(mc,sr,inMemorySizeLimit); updateMessageBody.setBinaryStream(1,is,(int) is.getSize()); updateMessageBody.setString(2, mc.getName()); updateMessageBody.setString(3, repositoryName); updateMessageBody.execute(); } finally { theJDBCUtil.closeJDBCStatement(updateMessageBody); } } } else { //Insert the record into the database PreparedStatement insertMessage = null; try { String insertMessageSQL = sqlQueries.getSqlString("insertMessageSQL", true); int number_of_parameters = getNumberOfParameters (insertMessageSQL); insertMessage = conn.prepareStatement(insertMessageSQL); insertMessage.setString(1, mc.getName()); insertMessage.setString(2, repositoryName); insertMessage.setString(3, mc.getState()); insertMessage.setString(4, mc.getErrorMessage()); if (mc.getSender() == null) { insertMessage.setNull(5, java.sql.Types.VARCHAR); } else { insertMessage.setString(5, mc.getSender().toString()); } StringBuffer recipients = new StringBuffer(); for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) { recipients.append(i.next().toString()); if (i.hasNext()) { recipients.append("\r\n"); } } insertMessage.setString(6, recipients.toString()); insertMessage.setString(7, mc.getRemoteHost());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -