📄 dbtoolimp.java
字号:
package com.fetion.cmpp.server.db;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.fetion.cmpp.common.Message;
import com.fetion.cmpp.server.util.Constants;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
/**
* 服务器端用berkeley数据库进行持久化操作
* @author Administrator
*
*/
public class DBToolImp implements DBTool{
private Log logger = LogFactory.getLog(getClass());
private Environment dbEnvironment;
private Database newMessageDB;
private Database historyMessageDB;
private EntryBinding newMessageBinding;
private EntryBinding historyMessageBinding;
private Sequence newMessageSeq;
private Sequence historyMessageSeq;
public boolean openDB() throws DatabaseException,
UnsupportedEncodingException {
logger.debug("cmpp dataBase opening ......");
boolean result = false;
if (openDBEnv()) {
result = openCmppDB();
}
logger.debug("cmpp dataBase opened !");
return result;
}
public boolean closeDB() throws DatabaseException {
logger.debug("cmpp dataBase closing ......");
boolean result = false;
this.closeCmppDB();
this.closeEnvAndCleanLog();
result = true;
logger.debug("cmpp dataBase closed !");
return result;
}
public void insertNewMessages(Message message) throws DatabaseException {
if (message != null && dbEnvironment != null && newMessageDB != null
&& newMessageBinding != null && newMessageSeq != null) {
logger.debug("new message adding ......");
Long id = newMessageSeq.get(null, 1);
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(id, key);
DatabaseEntry data = new DatabaseEntry();
newMessageBinding.objectToEntry(message, data);
Transaction txn = dbEnvironment.beginTransaction(null, null);
OperationStatus status = newMessageDB.put(txn, key, data);
if (status != OperationStatus.SUCCESS) {
throw new DatabaseException("Data insertion got status "
+ status);
}
logger.debug("new message added !");
txn.commit();
}
}
public void insertHistoryMessages(List<Message> messages)
throws DatabaseException {
if (messages != null && dbEnvironment != null
&& historyMessageDB != null && historyMessageBinding != null
&& historyMessageSeq != null) {
logger.debug("history messages adding ......");
Transaction txn = dbEnvironment.beginTransaction(null, null);
Long id = null;
DatabaseEntry key = null;
DatabaseEntry data = null;
for (Message message : messages) {
id = historyMessageSeq.get(null, 1);
key = new DatabaseEntry();
LongBinding.longToEntry(id, key);
data = new DatabaseEntry();
historyMessageBinding.objectToEntry(message, data);
historyMessageDB.put(txn, key, data);
}
logger.debug("history messages added !");
txn.commit();
}
}
public Map<Long, Message> getAllNewMessages() {
Cursor cursor = null;
Map<Long, Message> messages = new HashMap<Long, Message>();
if (dbEnvironment != null && newMessageDB != null
&& newMessageBinding != null) {
logger.debug("new messages searching ......");
// Open the cursor.
try {
cursor = newMessageDB.openCursor(null, null);
DatabaseEntry foundKey = new DatabaseEntry();
DatabaseEntry foundData = new DatabaseEntry();
// 使用cursor.getPrev方法来遍历游标获取数据
while (cursor.getPrev(foundKey, foundData, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
Long theKey = LongBinding.entryToLong(foundKey);
Message theData = (Message) newMessageBinding
.entryToObject(foundData);
messages.put(new Long(theKey), theData);
}
} catch (Exception e) {
} finally {
try {
cursor.close();
} catch (DatabaseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return messages;
}
public Map<Long, Message> getAllHistoryMessages() {
Cursor cursor = null;
Map<Long, Message> messages = new HashMap<Long, Message>();
if (dbEnvironment != null && historyMessageDB != null
&& historyMessageBinding != null) {
// Open the cursor.
try {
cursor = historyMessageDB.openCursor(null, null);
DatabaseEntry foundKey = new DatabaseEntry();
DatabaseEntry foundData = new DatabaseEntry();
// 使用cursor.getPrev方法来遍历游标获取数据
while (cursor.getPrev(foundKey, foundData, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
Long theKey = LongBinding.entryToLong(foundKey);
Message theData = (Message) historyMessageBinding
.entryToObject(foundData);
messages.put(new Long(theKey), theData);
}
} catch (Exception e) {
} finally {
try {
cursor.close();
} catch (DatabaseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
logger.debug("History messages searched !");
return messages;
}
public void deleteAllNewMessages(Map<Long, Message> messages)
throws DatabaseException {
if (messages != null && dbEnvironment != null && newMessageDB != null
&& newMessageBinding != null && newMessageSeq != null) {
logger.debug("new messages deleting ......");
Transaction txn = dbEnvironment.beginTransaction(null, null);
Set<Long> keys = messages.keySet();
DatabaseEntry keyEntry = null;
for (Long key : keys) {
keyEntry = new DatabaseEntry();
LongBinding.longToEntry(key, keyEntry);
newMessageDB.delete(txn, keyEntry);
}
logger.debug("new messages deleted !");
txn.commit();
}
}
public void deleteAllNewMessages() throws DatabaseException {
logger.info("new messages deleting ......");
deleteAllNewMessages(this.getAllNewMessages());
logger.info("new messages deleted !");
}
public void remarkHadSendMessages(Map<Long, Message> messages) throws DatabaseException{
if (messages != null && dbEnvironment != null && newMessageDB != null
&& newMessageBinding != null && newMessageSeq != null && historyMessageDB != null
&& historyMessageBinding != null) {
logger.debug("new messages deleting ......");
Transaction txn = dbEnvironment.beginTransaction(null, null);
Set<Long> keys = messages.keySet();
DatabaseEntry keyEntry = null;
for (Long key : keys) {
keyEntry = new DatabaseEntry();
LongBinding.longToEntry(key, keyEntry);
newMessageDB.delete(txn, keyEntry);
}
logger.debug("new messages deleted !");
insertHistoryMessages(new ArrayList<Message>(messages.values()));
txn.commit();
}
}
public void deleteAllHistoryMessages(Map<Long, Message> messages)
throws DatabaseException {
if (messages != null && dbEnvironment != null && historyMessageDB != null
&& historyMessageBinding != null){
logger.debug("history messages deleting ......");
Transaction txn = dbEnvironment.beginTransaction(null, null);
Set<Long> keys = messages.keySet();
DatabaseEntry keyEntry = null;
for (Long key : keys) {
keyEntry = new DatabaseEntry();
LongBinding.longToEntry(key, keyEntry);
historyMessageDB.delete(txn, keyEntry);
}
logger.debug("history messages deleting ......");
txn.commit();
}
}
public void deleteAllHistoryMessages()throws DatabaseException {
logger.info("History messages deleting ......");
deleteAllHistoryMessages(this.getAllHistoryMessages());
logger.info("History messages deleted !");
}
public void deleteDataBase() throws DatabaseException {
File dir = new File(Constants.CMPP_DB_ENV_NAME);
if (dir.exists()){
File[] files = dir.listFiles();
for(File file : files){
file.delete();
}
dir.delete();
}
}
/**
* Create a new, transactional database environment
*
* @return
* @throws DatabaseException
*/
private boolean openDBEnv() throws DatabaseException {
boolean result = false;
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setTransactional(true);
envConfig.setAllowCreate(true);// 如果不存在则创建一个
File dir = new File(Constants.CMPP_DB_ENV_NAME);
if (!dir.exists())
dir.mkdir();
logger.debug("path: " + dir.getAbsolutePath());
dbEnvironment = new Environment(dir, envConfig);
result = true;
return result;
}
/**
* close database environment
*
* @return
*/
private void closeDBEnv() {
try {
if (dbEnvironment != null) {
dbEnvironment.close();
}
} catch (DatabaseException dbe) {
// Exception handling goes here
}
}
/**
* close database environment and clean logs
*
* @return
* @throws DatabaseException
*/
private void closeEnvAndCleanLog() throws DatabaseException {
if (dbEnvironment != null) {
dbEnvironment.cleanLog();
dbEnvironment.close();
}
}
private boolean openCmppDB() throws DatabaseException,
UnsupportedEncodingException {
boolean result = false;
if (dbEnvironment != null) {
Transaction txn = dbEnvironment.beginTransaction(null, null);
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
// dbConfig.setSortedDuplicates(true);
newMessageDB = dbEnvironment.openDatabase(txn,
Constants.CMPP_NEWMESSAGEDB, dbConfig);
StoredClassCatalog newMessage = new StoredClassCatalog(newMessageDB);
newMessageBinding = new SerialBinding(newMessage, Message.class);
/* Create the sequence oject. */
SequenceConfig config = new SequenceConfig();
config.setAllowCreate(true);
DatabaseEntry newMessagekey = new DatabaseEntry(
Constants.CMPP_NEWMESSAGEDB_KEY_NAME.getBytes("UTF-8"));
newMessageSeq = newMessageDB.openSequence(null, newMessagekey,
config);
historyMessageDB = dbEnvironment.openDatabase(txn,
Constants.CMPP_HISTORYMESSAGEDB, dbConfig);
StoredClassCatalog historyMessage = new StoredClassCatalog(
historyMessageDB);
historyMessageBinding = new SerialBinding(historyMessage,
Message.class);
/* Create the sequence oject. */
DatabaseEntry historyMessagekey = new DatabaseEntry(
Constants.CMPP_NEWMESSAGEDB_KEY_NAME.getBytes("UTF-8"));
historyMessageSeq = historyMessageDB.openSequence(null,
historyMessagekey, config);
txn.commit();
result = true;
}
return result;
}
private void closeCmppDB() throws DatabaseException {
if (newMessageSeq != null) {
newMessageSeq.close();
}
if (historyMessageSeq != null) {
historyMessageSeq.close();
}
if (newMessageDB != null) {
newMessageDB.close();
}
if (historyMessageDB != null) {
historyMessageDB.close();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -