📄 subscription.java
字号:
// // MINIONS // /////////////////////////////////////////////////////////////////////////////////// /** * <p> * Returns true if the method is a snapshot query. * </p> * */ private static boolean isSnapshotQuery( Method candidate ) { int modifiers = candidate.getModifiers(); return ( Modifier.isPublic( modifiers ) && Modifier.isStatic( modifiers ) && candidate.getReturnType() == ResultSet.class && ( candidate.getAnnotation( SnapshotQuery.class ) != null ) ); } /** * <p> * Create a VTI to grab data from a foreign data source. Also create an * empty Derby table to hold its results. * </p> * */ private static void createVTIAndEmptyTable ( String subscriptionClassName, Method method, String jdbcDriverName, String connectionURL, HashSet<String> parameterMap ) throws Exception { SnapshotQuery details = method.getAnnotation( SnapshotQuery.class ); String query = details.query(); String[] queryParameters = details.parameters(); int paramCount = queryParameters.length; // placeholders just so that we can determine the query's shape String[] argValues = new String[ paramCount ]; String functionName = getFunctionName( method ); String tableName = getTableName( method ); for ( int i = 0; i < paramCount; i++ ) { String paramName = queryParameters[ i ]; if ( !parameterMap.contains( paramName ) ) { throw new SQLException( paramName + " is not a parameter defined for subscription " + subscriptionClassName ); } } // first create the table function to read from the foreign database registerVTI( method, jdbcDriverName, connectionURL, query, argValues ); // now create a table based on the shape of the query createEmptyTable( tableName, functionName ); } /** * <p> * Drop a snapshot VTI and the table where its results are dumped. * </p> * */ private static void dropVTIAndTable ( Method method ) throws Exception { String functionName = getFunctionName( method ); String tableName = getTableName( method ); VTIHelper.dropObject( "function", functionName, false ); VTIHelper.dropObject( "table", tableName, false ); } /** * <p> * Create an empty table based on the shape of a table function. * </p> * */ private static void createEmptyTable ( String tableName, String functionName ) throws SQLException { StringBuilder buffer = new StringBuilder(); buffer.append( "create table " ); buffer.append( tableName ); buffer.append( "\n" ); buffer.append( "as select s.* from table( " + functionName + "( ) ) s\n" ); buffer.append( "with no data\n" ); VTIHelper.executeDDL( buffer.toString() ); } /** * <p> * Declare the refresh procedure for the subscription. * </p> * */ private static void registerRefreshProcedure( SubscriptionSignature subscriptionSignature ) throws Exception { String refreshProcedureName = subscriptionSignature.refreshProcedureName(); String[] subscriptionParameters = subscriptionSignature.parameters(); int parameterCount = subscriptionParameters.length; StringBuffer buffer = new StringBuffer(); buffer.append( "create procedure " + refreshProcedureName + "\n" ); buffer.append( "(\n" ); buffer.append( "\tsubscriptionClassName varchar( 32672 ),\n" ); buffer.append( "\tconnectionURL varchar( 32672 )\n" ); for ( int i = 0; i < parameterCount; i++ ) { buffer.append( ", arg" + i + " varchar( 32672 )\n" ); } buffer.append( ")\n" ); buffer.append( "language java\n" ); buffer.append( "parameter style java\n" ); buffer.append( "modifies sql data\n" ); buffer.append( "external name 'org.apache.derbyDemo.vtis.snapshot.Subscription.refreshSubscription'\n" ); VTIHelper.executeDDL( buffer.toString() ); } /** * <p> * Drop the refresh procedure for a subscription. * </p> * */ private static void unregisterRefreshProcedure( SubscriptionSignature signature ) throws Exception { VTIHelper.dropObject( "procedure", signature.refreshProcedureName(), false ); } /** * <p> * Empty all of the subscribed tables. * </p> * */ private static void truncateTables( ArrayList<Method> snapshotQueries ) throws Exception { Connection conn = VTIHelper.getLocalConnection(); int count = snapshotQueries.size(); for ( int i = count - 1; i > -1; i-- ) { Method method = snapshotQueries.get( i ); String tableName = getTableName( method ); String sql = "delete from " + tableName; VTIHelper.print( sql ); PreparedStatement ps = conn.prepareStatement( sql ); ps.execute(); ps.close(); } } /** * <p> * Fill all of the subscribed tables. * </p> * */ private static void fillTables( ArrayList<Method> snapshotQueries ) throws Exception { Connection conn = VTIHelper.getLocalConnection(); int count = snapshotQueries.size(); for ( int i = 0; i < count; i++ ) { Method method = snapshotQueries.get( i ); String tableName = getTableName( method ); String functionName = getFunctionName( method ); String alias = "xxx"; String sql = ( "insert into " + tableName + " select " + alias + ".* from table( " + functionName + "() ) " + alias ); VTIHelper.print( sql ); PreparedStatement ps = conn.prepareStatement( sql ); ps.execute(); ps.close(); } } /** * <p> * Create a table name from a method name. * </p> * */ private static String getTableName( Method method ) { return VTIHelper.doubleQuote( method.getName() ); } /** * <p> * Create a function name from a method name. * </p> * */ private static String getFunctionName( Method method ) { return VTIHelper.doubleQuote( method.getName() ); } /////////////////////////////////////////////////////////////////////////////////// // // MANAGING THE SUBSCRIPTION CONTEXTS // /////////////////////////////////////////////////////////////////////////////////// /** * <p> * Create a new subscription context for creating or refreshing a subscription. * </p> * */ private static void createContext ( String subscriptionClassName, String[] parameterValues, String connectionURL ) throws Exception { Class subscriptionClass = Class.forName( subscriptionClassName ); SubscriptionSignature subscriptionSignature = (SubscriptionSignature) subscriptionClass.getAnnotation( SubscriptionSignature.class ); HashMap<String, String> parameterMap = null; String[] parameterNames = subscriptionSignature.parameters(); if ( parameterValues != null ) { parameterMap = new HashMap<String, String>(); int count = parameterNames.length; int actual = parameterValues.length; if ( count != actual ) { throw new SQLException( "Expected " + count + " parameters, but saw " + actual ); } for ( int i = 0; i < count; i++ ) { parameterMap.put( parameterNames[ i ], parameterValues[ i ] ); } } SubscriptionContext newContext = new SubscriptionContext( subscriptionSignature, parameterMap, connectionURL ); SubscriptionContext oldContext = getContext( subscriptionClassName, false ); if ( oldContext != null ) { throw new SQLException( subscriptionClassName + " already in use. Try again later." ); } _contexts.put( subscriptionClassName, newContext ); } /** * <p> * Drop a subscription context. * </p> * */ private static void dropContext ( String subscriptionClassName ) { _contexts.remove( subscriptionClassName ); } /** * <p> * Get a subscription context. * </p> * */ private static SubscriptionContext getContext ( String subscriptionClassName, boolean shouldExist ) throws SQLException { SubscriptionContext context = _contexts.get( subscriptionClassName ); if ( shouldExist && (context == null) ) { throw new SQLException ( "Could not find execution context for " + subscriptionClassName + ". Maybe you are trying to invoke a snapshot table function outside of the refresh procedure?" ); } return context; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -