filebacking.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,170 行 · 第 1/2 页
JAVA
1,170 行
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * * @author Scott Ferguson */package com.caucho.server.cluster;import com.caucho.config.ConfigException;import com.caucho.db.jdbc.DataSourceImpl;import com.caucho.util.Alarm;import com.caucho.util.FreeList;import com.caucho.util.L10N;import com.caucho.vfs.Path;import com.caucho.vfs.ReadStream;import com.caucho.vfs.WriteStream;import com.caucho.server.admin.Management;import javax.sql.DataSource;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.util.logging.Level;import java.util.logging.Logger;/** * Manages the backing for the file objectStore. */public class FileBacking { private static final L10N L = new L10N(FileBacking.class); private static final Logger log = Logger.getLogger(FileBacking.class.getName()); private FreeList<ClusterConnection> _freeConn = new FreeList<ClusterConnection>(32); private String _name; private Path _path; private DataSource _dataSource; // version identifier for this instance of the backing. Each new start // will increment the version private int _version = 1; private String _tableName; private String _accessQuery; private String _countQuery; private String _dumpQuery; private String _insertMetadataQuery; private String _insertSaveQuery; private String _loadQuery; private String _loadIfVersionQuery; private String _removeQuery; private String _setExpiresQuery; private String _timeoutQuery; private String _updateMetadataQuery; private String _updateMetadataVersionQuery; private String _updateSaveLockQuery; private String _updateSaveOverrideQuery; private String _updateSaveDataQuery; public FileBacking() { } public FileBacking(String name) { _name = name; } /** * Returns the path to the directory. */ public Path getPath() { return _path; } /** * Sets the path to the saved file. */ public void setPath(Path path) { _path = path; } /** * Sets the table name */ public void setTableName(String table) { _tableName = table; } public boolean init(int clusterLength) throws Exception { if (_path == null) _path = Management.getCurrentPath(); if (_path == null) throw new ConfigException(L.l("file-backing needs path.")); if (_tableName == null) throw new ConfigException(L.l("file-backing needs tableName.")); int length = clusterLength; if (length <= 0) length = 1; _accessQuery = "UPDATE " + _tableName + " SET update_time=? WHERE id=?"; _countQuery = "SELECT count(*) FROM " + _tableName; _dumpQuery = ("SELECT id, is_dead, expire_interval, data" + " FROM " + _tableName + " WHERE ? <= mod_time AND " + " (?=server1 OR ?=server2 OR ?=server3)"); _insertMetadataQuery = ("INSERT into " + _tableName + " (id,store_id,is_valid,is_dead,data_hash,update_time,expire_interval,server1,server2,server3,local_version)" + " VALUES (?,?,0,?,?,?,?,?,?,?,?)"); _insertSaveQuery = ("INSERT into " + _tableName + " (id,store_id,is_valid,is_dead,data_hash,data,update_time,expire_interval,server1,server2,server3,local_version) " + "VALUES(?,?,1,0,?,?,?,?,?,?,?,?)"); _loadQuery = ("SELECT update_time,data_hash,data" + " FROM " + _tableName + " WHERE id=? AND is_valid=1 AND is_dead=0"); _loadIfVersionQuery = ("SELECT update_time,data_hash,data" + " FROM " + _tableName + " WHERE id=? AND is_valid=1 AND is_dead=0" + " AND local_version=?"); _removeQuery = "UPDATE " + _tableName + " SET is_dead=1,data=null WHERE id=?"; _setExpiresQuery = "UPDATE " + _tableName + " SET expire_interval=? WHERE id=?"; // objectAccess window is 1/4 the expire interval _timeoutQuery = ("DELETE FROM " + _tableName + " WHERE update_time + 5 * expire_interval / 4 < ?"); _updateMetadataQuery = ("UPDATE " + _tableName + " SET is_valid=0" + " WHERE id=? AND data_hash <> ?"); _updateMetadataVersionQuery = ("UPDATE " + _tableName + " SET update_time=?,local_version=?" + " WHERE id=? AND data_hash = ?"); _updateSaveLockQuery = ("UPDATE " + _tableName + " SET data_hash=?,data=?,update_time=?,is_valid=1,is_dead=0,local_version=?" + " WHERE id=? AND data_hash=?"); _updateSaveOverrideQuery = ("UPDATE " + _tableName + " SET data_hash=?,data=?,update_time=?,is_valid=1,is_dead=0,local_version=?" + " WHERE id=?"); try { _path.mkdirs(); } catch (IOException e) { } DataSourceImpl dataSource = new DataSourceImpl(); dataSource.setPath(_path); dataSource.setRemoveOnError(true); dataSource.init(); _dataSource = dataSource; initDatabase(); initVersion(); return true; } /** * Returns the data source. */ public DataSource getDataSource() { return _dataSource; } /** * Create the database, initializing if necessary. */ private void initDatabase() throws Exception { Connection conn = _dataSource.getConnection(); try { Statement stmt = conn.createStatement(); try { String sql = ("SELECT expire_interval,store_id,is_valid,local_version" + " FROM " + _tableName + " WHERE 1=0"); ResultSet rs = stmt.executeQuery(sql); rs.next(); rs.close(); return; } catch (Exception e) { log.log(Level.FINEST, e.toString(), e); log.finer(this + " " + e.toString()); } try { stmt.executeQuery("DROP TABLE " + _tableName); } catch (Exception e) { log.log(Level.FINEST, e.toString(), e); } String sql = ("CREATE TABLE " + _tableName + " (\n" + " id BINARY(20) PRIMARY KEY,\n" + " store_id BINARY(20),\n" + " data_hash BINARY(20),\n" + " data BLOB,\n" + " expire_interval BIGINT,\n" + " update_time BIGINT,\n" + " server1 SMALLINT,\n" + " server2 SMALLINT,\n" + " server3 SMALLINT,\n" + " is_dead BIT,\n" + " is_valid BIT,\n" + " local_version INTEGER)"); log.fine(sql); stmt.executeUpdate(sql); } finally { conn.close(); } } /** * Finds the old version id from the database. */ private void initVersion() throws Exception { Connection conn = _dataSource.getConnection(); try { Statement stmt = conn.createStatement(); try { String sql = ("select max(local_version)" + " from " + _tableName); ResultSet rs = stmt.executeQuery(sql); if (rs.next()) { _version = rs.getInt(1) + 1; } rs.close(); // rollover if (Integer.MAX_VALUE / 2 < _version) { sql = "update " + _tableName + " set local_version=0"; stmt.executeUpdate(sql); _version = 1; } return; } catch (Exception e) { log.log(Level.FINEST, e.toString(), e); log.finer(this + " " + e.toString()); } } finally { conn.close(); } } public long start() throws Exception { long delta = - Alarm.getCurrentTime(); Connection conn = null; try { conn = _dataSource.getConnection(); Statement stmt = conn.createStatement(); String sql = "SELECT MAX(update_time) FROM " + _tableName; ResultSet rs = stmt.executeQuery(sql); if (rs.next()) delta = rs.getLong(1) - Alarm.getCurrentTime(); } finally { if (conn != null) conn.close(); } return delta; } /** * Clears the old objects. */ public void clearOldObjects(long maxIdleTime) throws SQLException { Connection conn = null; try { if (maxIdleTime > 0) { conn = _dataSource.getConnection(); PreparedStatement pstmt = conn.prepareStatement(_timeoutQuery); long now = Alarm.getCurrentTime(); pstmt.setLong(1, now); int count = pstmt.executeUpdate(); // System.out.println("OBSOLETE:" + count); if (count > 0) log.fine(this + " purged " + count + " old sessions"); pstmt.close(); } } finally { if (conn != null) conn.close(); } } /** * Load the session from the jdbc objectStore. * * @param session the session to fill. * * @return true if the loadImpl was valid. */ public boolean load(ClusterObject clusterObj, Object obj) throws Exception { return loadImpl(clusterObj, obj, false); } /** * Load the session from the jdbc objectStore. * * @param session the session to fill. * * @return true if the loadImpl was valid. */ public boolean loadIfVersion(ClusterObject clusterObj, Object obj) throws Exception { return loadImpl(clusterObj, obj, true); } /** * Load the session from the jdbc objectStore. * * @param session the session to fill. * * @return true if the loadImpl was valid. */ private boolean loadImpl(ClusterObject clusterObj, Object obj, boolean isVersion) throws Exception { HashKey objectId = clusterObj.getObjectId(); ClusterConnection conn = getConnection(); try { PreparedStatement stmt; if (isVersion) { stmt = conn.prepareLoadIfVersion(); stmt.setBytes(1, objectId.getHash()); stmt.setInt(2, _version); } else { stmt = conn.prepareLoad(); stmt.setBytes(1, objectId.getHash()); } ResultSet rs = stmt.executeQuery(); boolean validLoad = false; if (rs.next()) { //System.out.println("LOAD: " + uniqueId); long updateTime = rs.getLong(1); byte []digest = rs.getBytes(2); InputStream is = rs.getBinaryStream(3); if (log.isLoggable(Level.FINE)) log.fine(this + " load key=" + objectId); validLoad = clusterObj.loadImpl(is, obj, digest); if (validLoad) clusterObj.setAccessTime(updateTime); is.close(); } else if (log.isLoggable(Level.FINE)) log.fine(this + " load: no local object loaded for " + objectId); else { // System.out.println("NO-LOAD: " + uniqueId); } rs.close(); return validLoad; } finally { conn.close(); } } /** * Updates the object's objectAccess time. * * @param obj the object to objectStore. */ public void updateAccess(HashKey id) throws Exception { ClusterConnection conn = getConnection(); try { PreparedStatement stmt = conn.prepareAccess(); long now = Alarm.getCurrentTime(); stmt.setLong(1, now); stmt.setBytes(2, id.getHash()); int count = stmt.executeUpdate(); if (count > 0) { if (log.isLoggable(Level.FINE)) log.fine(this + " access " + id); return; } } finally { conn.close(); } } /** * Sets the object's expire_interval. * * @param obj the object to objectStore. */ public void setExpireInterval(HashKey id, long expireInterval) throws Exception { ClusterConnection conn = getConnection(); try { PreparedStatement stmt = conn.prepareSetExpireInterval(); stmt.setLong(1, expireInterval); stmt.setBytes(2, id.getHash()); int count = stmt.executeUpdate(); if (count > 0) { if (log.isLoggable(Level.FINE)) log.fine(this + " set expire interval " + expireInterval + " for " + id); return; } } finally { conn.close(); } } /** * Removes the named object from the objectStore. */ public void remove(HashKey id) throws Exception { ClusterConnection conn = getConnection(); try { PreparedStatement pstmt = conn.prepareRemove(); pstmt.setBytes(1, id.getHash()); int count = pstmt.executeUpdate(); if (log.isLoggable(Level.FINE)) log.fine(this + " remove " + id); } finally { conn.close(); } } /** * Reads from the objectStore. */ public byte [] read(HashKey id, WriteStream os) throws IOException { Connection conn = null; byte []digest = null; try { conn = _dataSource.getConnection(); PreparedStatement pstmt = conn.prepareStatement(_loadQuery); pstmt.setBytes(1, id.getHash()); ResultSet rs = pstmt.executeQuery(); if (rs.next()) { long updateTime = rs.getLong(1); digest = rs.getBytes(2); InputStream is = rs.getBinaryStream(3); os.writeStream(is); is.close(); return digest; } } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } finally { try { if (conn != null) conn.close(); } catch (SQLException e) { } } return digest; } /** * Stores the cluster object on the local objectStore.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?