plpython_function.sql

来自「postgresql8.3.4源码,开源数据库」· SQL 代码 · 共 483 行

SQL
483
字号
CREATE FUNCTION global_test_one() returns text    AS'if not SD.has_key("global_test"):	SD["global_test"] = "set by global_test_one"if not GD.has_key("global_test"):	GD["global_test"] = "set by global_test_one"return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'    LANGUAGE plpythonu;CREATE FUNCTION global_test_two() returns text    AS'if not SD.has_key("global_test"):	SD["global_test"] = "set by global_test_two"if not GD.has_key("global_test"):	GD["global_test"] = "set by global_test_two"return "SD: " + SD["global_test"] + ", GD: " + GD["global_test"]'    LANGUAGE plpythonu;CREATE FUNCTION static_test() returns int4    AS'if SD.has_key("call"):	SD["call"] = SD["call"] + 1else:	SD["call"] = 1return SD["call"]'    LANGUAGE plpythonu;-- import python modulesCREATE FUNCTION import_fail() returns text    AS'try:	import foosocketexcept Exception, ex:	plpy.notice("import socket failed -- %s" % str(ex))	return "failed as expected"return "succeeded, that wasn''t supposed to happen"'    LANGUAGE plpythonu;CREATE FUNCTION import_succeed() returns text	AS'try:  import array  import bisect  import calendar  import cmath  import errno  import math  import md5  import operator  import random  import re  import sha  import string  import timeexcept Exception, ex:	plpy.notice("import failed -- %s" % str(ex))	return "failed, that wasn''t supposed to happen"return "succeeded, as expected"'    LANGUAGE plpythonu;CREATE FUNCTION import_test_one(p text) RETURNS text	AS'import shadigest = sha.new(p)return digest.hexdigest()'	LANGUAGE plpythonu;CREATE FUNCTION import_test_two(u users) RETURNS text	AS'import shaplain = u["fname"] + u["lname"]digest = sha.new(plain);return "sha hash of " + plain + " is " + digest.hexdigest()'	LANGUAGE plpythonu;CREATE FUNCTION argument_test_one(u users, a1 text, a2 text) RETURNS text	AS'keys = u.keys()keys.sort()out = []for key in keys:    out.append("%s: %s" % (key, u[key]))words = a1 + " " + a2 + " => {" + ", ".join(out) + "}"return words'	LANGUAGE plpythonu;-- these triggers are dedicated to HPHC of RI who-- decided that my kid's name was william not willem, and-- vigorously resisted all efforts at correction.  they have-- since gone bankrupt...CREATE FUNCTION users_insert() returns trigger	AS'if TD["new"]["fname"] == None or TD["new"]["lname"] == None:	return "SKIP"if TD["new"]["username"] == None:	TD["new"]["username"] = TD["new"]["fname"][:1] + "_" + TD["new"]["lname"]	rv = "MODIFY"else:	rv = Noneif TD["new"]["fname"] == "william":	TD["new"]["fname"] = TD["args"][0]	rv = "MODIFY"return rv'	LANGUAGE plpythonu;CREATE FUNCTION users_update() returns trigger	AS'if TD["event"] == "UPDATE":	if TD["old"]["fname"] != TD["new"]["fname"] and TD["old"]["fname"] == TD["args"][0]:		return "SKIP"return None'	LANGUAGE plpythonu;CREATE FUNCTION users_delete() RETURNS trigger	AS'if TD["old"]["fname"] == TD["args"][0]:	return "SKIP"return None'	LANGUAGE plpythonu;CREATE TRIGGER users_insert_trig BEFORE INSERT ON users FOR EACH ROW	EXECUTE PROCEDURE users_insert ('willem');CREATE TRIGGER users_update_trig BEFORE UPDATE ON users FOR EACH ROW	EXECUTE PROCEDURE users_update ('willem');CREATE TRIGGER users_delete_trig BEFORE DELETE ON users FOR EACH ROW	EXECUTE PROCEDURE users_delete ('willem');-- dump trigger dataCREATE TABLE trigger_test	(i int, v text );CREATE FUNCTION trigger_data() returns trigger language plpythonu as $$if TD.has_key('relid'):	TD['relid'] = "bogus:12345"skeys = TD.keys()skeys.sort()for key in skeys:	val = TD[key]	plpy.notice("TD[" + key + "] => " + str(val))return None  $$;CREATE TRIGGER show_trigger_data_trig BEFORE INSERT OR UPDATE OR DELETE ON trigger_testFOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');insert into trigger_test values(1,'insert');update trigger_test set v = 'update' where i = 1;delete from trigger_test;      DROP TRIGGER show_trigger_data_trig on trigger_test;      DROP FUNCTION trigger_data();-- nested calls--CREATE FUNCTION nested_call_one(a text) RETURNS text	AS'q = "SELECT nested_call_two(''%s'')" % ar = plpy.execute(q)return r[0]'	LANGUAGE plpythonu ;CREATE FUNCTION nested_call_two(a text) RETURNS text	AS'q = "SELECT nested_call_three(''%s'')" % ar = plpy.execute(q)return r[0]'	LANGUAGE plpythonu ;CREATE FUNCTION nested_call_three(a text) RETURNS text	AS'return a'	LANGUAGE plpythonu ;-- some spi stuffCREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text	AS'if not SD.has_key("myplan"):	q = "SELECT count(*) FROM users WHERE lname = $1"	SD["myplan"] = plpy.prepare(q, [ "text" ])try:	rv = plpy.execute(SD["myplan"], [a])	return "there are " + str(rv[0]["count"]) + " " + str(a) + "s"except Exception, ex:	plpy.error(str(ex))return None'	LANGUAGE plpythonu;CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text	AS'if not SD.has_key("myplan"):	q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a	SD["myplan"] = plpy.prepare(q)try:	rv = plpy.execute(SD["myplan"])	if len(rv):		return rv[0]["count"]except Exception, ex:	plpy.error(str(ex))return None'	LANGUAGE plpythonu;/* really stupid function just to get the module loaded*/CREATE FUNCTION stupid() RETURNS text AS 'return "zarkon"' LANGUAGE plpythonu;/* a typo*/CREATE FUNCTION invalid_type_uncaught(a text) RETURNS text	AS'if not SD.has_key("plan"):	q = "SELECT fname FROM users WHERE lname = $1"	SD["plan"] = plpy.prepare(q, [ "test" ])rv = plpy.execute(SD["plan"], [ a ])if len(rv):	return rv[0]["fname"]return None'	LANGUAGE plpythonu;/* for what it's worth catch the exception generated by * the typo, and return None */CREATE FUNCTION invalid_type_caught(a text) RETURNS text	AS'if not SD.has_key("plan"):	q = "SELECT fname FROM users WHERE lname = $1"	try:		SD["plan"] = plpy.prepare(q, [ "test" ])	except plpy.SPIError, ex:		plpy.notice(str(ex))		return Nonerv = plpy.execute(SD["plan"], [ a ])if len(rv):	return rv[0]["fname"]return None'	LANGUAGE plpythonu;/* for what it's worth catch the exception generated by * the typo, and reraise it as a plain error */CREATE FUNCTION invalid_type_reraised(a text) RETURNS text	AS'if not SD.has_key("plan"):	q = "SELECT fname FROM users WHERE lname = $1"	try:		SD["plan"] = plpy.prepare(q, [ "test" ])	except plpy.SPIError, ex:		plpy.error(str(ex))rv = plpy.execute(SD["plan"], [ a ])if len(rv):	return rv[0]["fname"]return None'	LANGUAGE plpythonu;/* no typo no messing about*/CREATE FUNCTION valid_type(a text) RETURNS text	AS'if not SD.has_key("plan"):	SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1", [ "text" ])rv = plpy.execute(SD["plan"], [ a ])if len(rv):	return rv[0]["fname"]return None'	LANGUAGE plpythonu;/* Flat out syntax error*/CREATE FUNCTION sql_syntax_error() RETURNS text        AS'plpy.execute("syntax error")'        LANGUAGE plpythonu;/* check the handling of uncaught python exceptions */CREATE FUNCTION exception_index_invalid(text) RETURNS text	AS'return args[1]'	LANGUAGE plpythonu;/* check handling of nested exceptions */CREATE FUNCTION exception_index_invalid_nested() RETURNS text	AS'rv = plpy.execute("SELECT test5(''foo'')")return rv[0]'	LANGUAGE plpythonu;CREATE FUNCTION join_sequences(s sequences) RETURNS text	AS'if not s["multipart"]:	return s["sequence"]q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"]rv = plpy.execute(q)seq = s["sequence"]for r in rv:	seq = seq + r["sequence"]return seq'	LANGUAGE plpythonu;---- Universal Newline Support-- CREATE OR REPLACE FUNCTION newline_lf() RETURNS integer ASE'x = 100\ny = 23\nreturn x + y\n'LANGUAGE plpythonu;CREATE OR REPLACE FUNCTION newline_cr() RETURNS integer ASE'x = 100\ry = 23\rreturn x + y\r'LANGUAGE plpythonu;CREATE OR REPLACE FUNCTION newline_crlf() RETURNS integer ASE'x = 100\r\ny = 23\r\nreturn x + y\r\n'LANGUAGE plpythonu;---- Unicode error handling--CREATE FUNCTION unicode_return_error() RETURNS text AS E'return u"\\x80"' LANGUAGE plpythonu;CREATE FUNCTION unicode_trigger_error() RETURNS trigger AS E'TD["new"]["testvalue"] = u"\\x80"return "MODIFY"' LANGUAGE plpythonu;CREATE TRIGGER unicode_test_bi BEFORE INSERT ON unicode_test  FOR EACH ROW EXECUTE PROCEDURE unicode_trigger_error();CREATE FUNCTION unicode_plan_error1() RETURNS text AS E'plan = plpy.prepare("SELECT $1 AS testvalue", ["text"])rv = plpy.execute(plan, [u"\\x80"], 1)return rv[0]["testvalue"]' LANGUAGE plpythonu;CREATE FUNCTION unicode_plan_error2() RETURNS text AS E'plan = plpy.prepare("SELECT $1 AS testvalue1, $2 AS testvalue2", ["text", "text"])rv = plpy.execute(plan, u"\\x80", 1)return rv[0]["testvalue1"]' LANGUAGE plpythonu;-- Tests for functions that return voidCREATE FUNCTION test_void_func1() RETURNS void AS $$x = 10$$ LANGUAGE plpythonu;-- illegal: can't return non-None value in void-returning funcCREATE FUNCTION test_void_func2() RETURNS void AS $$return 10$$ LANGUAGE plpythonu;CREATE FUNCTION test_return_none() RETURNS int AS $$None$$ LANGUAGE plpythonu;---- Test named parameters--CREATE FUNCTION test_param_names1(a0 integer, a1 text) RETURNS boolean AS $$assert a0 == args[0]assert a1 == args[1]return True$$ LANGUAGE plpythonu;CREATE FUNCTION test_param_names2(u users) RETURNS text AS $$assert u == args[0]return str(u)$$ LANGUAGE plpythonu;-- use deliberately wrong parameter namesCREATE FUNCTION test_param_names3(a0 integer) RETURNS boolean AS $$try:	assert a1 == args[0]	return Falseexcept NameError, e:	assert e.args[0].find("a1") > -1	return True$$ LANGUAGE plpythonu;---- Test returning SETOF--CREATE FUNCTION test_setof_as_list(count integer, content text) RETURNS SETOF text AS $$return [ content ]*count$$ LANGUAGE plpythonu;CREATE FUNCTION test_setof_as_tuple(count integer, content text) RETURNS SETOF text AS $$t = ()for i in xrange(count):	t += ( content, )return t$$ LANGUAGE plpythonu;CREATE FUNCTION test_setof_as_iterator(count integer, content text) RETURNS SETOF text AS $$class producer:	def __init__ (self, icount, icontent):		self.icontent = icontent		self.icount = icount	def __iter__ (self):		return self	def next (self):		if self.icount == 0:			raise StopIteration		self.icount -= 1		return self.icontentreturn producer(count, content)$$ LANGUAGE plpythonu;---- Test returning tuples--CREATE FUNCTION test_table_record_as(typ text, first text, second integer, retnull boolean) RETURNS table_record AS $$if retnull:	return Noneif typ == 'dict':	return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }elif typ == 'tuple':	return ( first, second )elif typ == 'list':	return [ first, second ]elif typ == 'obj':	class type_record: pass	type_record.first = first	type_record.second = second	return type_record$$ LANGUAGE plpythonu;CREATE FUNCTION test_type_record_as(typ text, first text, second integer, retnull boolean) RETURNS type_record AS $$if retnull:	return Noneif typ == 'dict':	return { 'first': first, 'second': second, 'additionalfield': 'must not cause trouble' }elif typ == 'tuple':	return ( first, second )elif typ == 'list':	return [ first, second ]elif typ == 'obj':	class type_record: pass	type_record.first = first	type_record.second = second	return type_record$$ LANGUAGE plpythonu;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?