⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dataset.pm

📁 普通的ETL工具
💻 PM
📖 第 1 页 / 共 3 页
字号:
		ETL::Pequel3::Type::DataMember::User->new(name => 'xml_version_str', default => 'xml version="1.0"'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'qualify_field_names', default => 1),
		ETL::Pequel3::Type::DataMember::User->new(name => 'rec_element_name', default => 'record'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'root_element_name', default => 'file');
	use Class::STL::ClassMembers::Constructor;
	sub code_prepare
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		$c->code("local(\$,)=\"@{[ $self->field_delimiter() ]}\";");
		return $self->datasource()->_code_prepare($c);
	}
	sub code_open
	{
		my $self = shift;
		my $c = $self->datasource()->_code_open(@_);
		$c->open_block();
			$self->code_prepare($c);
			$c->code("print @{[ $self->datasource()->fdname() ]}");
			$c->over();
				$c->code("'<?@{[ $self->xml_version_str() ]}?>',");
				$c->code("'<@{[ 
					$self->root_element_name() 
				]} pequel_script=\"@{[ 
					$self->pequel_ref()->script_filename()
				]}\" pequel_version=\"@{[ 
					$self->pequel_ref()->properties()->version()
				]}\">'");
				$c->code(";");
			$c->back();
		$c->close_block();
		return $c;
	}
	sub code_write
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		my @fields = @_; # ETL::Pequel3::Type::Field::Abstract object list;
		#print OUT
		#	"  <record att1=\"@{[ $_O_VAL[att1] ]}\" att2=\"@{[ $_O_VAL[att2] ]}\">",
		#	"    <F1>@{[ $_O_VAL[F1] ]}</F1>",
		#	"    <F2>@{[ $_O_VAL[F2] ]}</F2>",
		#	"    <F3><![CDATA[ @{[ $_O_VAL[F3] ]} ]]>",
		#	"  </record>"
		#	;
		$c->open_block();
			$self->code_prepare($c);
			$c->code("print @{[ $self->datasource()->fdname() ]}");
			$c->over();
			my $attrs =
				join
				(
					' ', 
					$self->rec_element_name(),
					map
					(
						qq#@{[ $self->qualify_field_names() ? $_->tname() : $_->name() 
							]}=\\"\@{[ @{[ $_->pequel_type()->getvar($_) ]} ]}\\"#, 
						grep($_->xml_type() eq 'attr', @fields)
					)
				);
			$c->code("\"  <$attrs>\",");
			map
			(
				do
				{
					$c->code("(\"    <@{[ $self->qualify_field_names() ? $_->tname() : $_->name() 
							]}>@{[ $_->xml_type() eq 'CDATA' ? '<![CDATA[' : '']}\"");
					$c->over();
					$c->code(". @{[ $_->pequel_type()->getvar($_) ]}");
					$c->code(". \"@{[ $_->xml_type() eq 'CDATA' ? ']]>' : '' ]}</@{[ 
							$self->qualify_field_names() ? $_->tname() : $_->name() ]}>\"),");
					$c->back();
				},
				grep($_->xml_type() ne 'attr', @fields)
			);
			$c->code("\"  </@{[ $self->rec_element_name() ]}>\"");
			$c->code(";");
			$c->back();
		$c->close_block();
		return $c;
	}
	sub code_close
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		$c->open_block();
			$self->code_prepare($c);
			$c->code("print @{[ $self->datasource()->fdname()
				]} '</@{[ $self->root_element_name() ]}>';");
		$c->close_block();
		return $self->datasource()->_code_close($c);
	}
}
# ----------------------------------------------------------------------------------------------------
{
	package ETL::Pequel3::Type::DataSet::Input::Packed;
	use base qw(ETL::Pequel3::Type::DataSet::Input::Abstract);
	use Class::STL::ClassMembers
		Class::STL::ClassMembers::DataMember->new(name => 'description', default => 'Specify F<packed> data format. The format should be specified with the F<pack_format> option. The default option is B<[A3/Z*]>. Enclose the format in square brackets to specify a repeating format; that is, the same format is used for each field.'),
		Class::STL::ClassMembers::DataMember->new(name => 'name', default => 'packed'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'pack_fmt', default => '[A3/Z*]'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'field_delimiter', default => ','); # TODO: remove
	use Class::STL::ClassMembers::Constructor;
	sub code_decode
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		my $numflds = shift;
		$c->code("unpack('"
			. ($self->pack_fmt()  =~ /^\[(.*)\]$/ ? $1 x $numflds : $self->pack_fmt())
			. "', \$_)");
		return $c;
	}
	sub code_read
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		my @fields = @_; # ETL::Pequel3::Type::Field::Abstract object list;
		my $numflds = int(@fields);
		$c->newline_off();
		$c->code("defined(\$_=<@{[ $self->datasource()->fdname() ]}>) && (\@@{[ $self->arr_vname() ]} = ");
		$self->code_decode($c, $numflds);
		$c->code(")");
		return $c;
	}
}
# ----------------------------------------------------------------------------------------------------
{
	package ETL::Pequel3::Type::DataSet::Output::Packed;
	use base qw(ETL::Pequel3::Type::DataSet::Output::Abstract);
	use Class::STL::ClassMembers
		Class::STL::ClassMembers::DataMember->new(name => 'description', default => 'Specify F<packed> data format. The format should be specified with the F<pack_format> option. The default option is B<[A3/Z*]>. Enclose the format in square brackets to specify a repeating format; that is, the same format is used for each field.'),
		Class::STL::ClassMembers::DataMember->new(name => 'name', default => 'packed'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'pack_fmt', default => '[A3/Z*]');
	use Class::STL::ClassMembers::Constructor;
	sub code_write_header
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		my @fields = @_; # ETL::Pequel3::Type::Field::Abstract object list;
		$c->open_block();
			$self->code_prepare($c);
			$c->code("print @{[ $self->datasource()->fdname() ]} pack '"
				. ($self->pack_fmt()  =~ /^\[(.*)\]$/ ? $1 x int(@fields) : $self->pack_fmt())
				. "', ");
			$c->over();
				map($c->code("'@{[ $_->name() ]}',"), @fields);
				$c->code(";");
			$c->back();
		$c->close_block();
		return $c;
	}
	sub code_write
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		my @fields = @_; # ETL::Pequel3::Type::Field::Abstract object list;
		$c->open_block();
			$self->code_prepare($c);
			$c->code("print @{[ $self->datasource()->fdname() ]} pack '"
				. ($self->pack_fmt()  =~ /^\[(.*)\]$/ ? $1 x int(@fields) : $self->pack_fmt())
				. "', ");
			$c->over();
				map($c->code("@{[ $_->getvar() ]},"), @fields);
				$c->code(";");
			$c->back();
		$c->close_block();
		return $c;
	}
}
# ----------------------------------------------------------------------------------------------------
{
#>	package ETL::Pequel3::Type::DataSource::Input::Dbi;
#>	package ETL::Pequel3::Type::DataSource::Input::SqlPlus;
#>	package ETL::Pequel3::Type::DataSet::Input::Sql;
	package ETL::Pequel3::Type::DataSet::Input::Dbi;
	use base qw(ETL::Pequel3::Type::DataSet::Input::Abstract);
	use Class::STL::ClassMembers 
		qw( 
			sth_vname
			dbh_vname

			_dsn_parsed
			_user_parsed
			_password_parsed
			_table_name_parsed
			_select_override_parsed
			_select_where_condition_parsed
			_select_fields_parsed
			_select_order_by_parsed
			_select_group_by_parsed
		),
		Class::STL::ClassMembers::DataMember->new(name => 'description', default => 'This dataset is for DBI connectivity.'),
		Class::STL::ClassMembers::DataMember->new(name => 'name', default => 'dbi'),
		Class::STL::ClassMembers::DataMember->new(name => 'datasource_type', default => 'dbi'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'dsn'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'user'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'password'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'table_name'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'select_override'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'select_where_condition'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'select_fields'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'select_order_by'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'select_group_by'),
		ETL::Pequel3::Type::DataMember::User->new(name => 'chomp', default => 1);
	use Class::STL::ClassMembers::Constructor;
	sub new_extra
	{
		my $self = shift;
		$self->table_name($self->datasource()->target_name()) if (defined($self->datasource()->target_name()));
		$self->sth_vname("_STH@{[ $self->datasource()->vname() ]}") if (defined($self->datasource()->vname()));
		$self->dbh_vname("_DBH@{[ $self->datasource()->vname() ]}") if (defined($self->datasource()->vname()));
		return $self;
	}
	sub map_output
	{
		my $self = shift;
		my %p = @_;
		$self->input_fields()->map_fields($p{field_map}, $self) 
			if (exists($p{field_map}) && defined($p{field_map}));

		# Preserve derived fields:
		use stl qw(deque);
		my $derived = deque();
		while 
		(
			$self->input_fields()->size() 
			&& $self->input_fields()->back()->can('calc_exp') 
			&& defined($self->input_fields()->back()->calc_exp())
		) 
		{
			$derived->push_front($self->input_fields()->back());
			$self->input_fields()->pop_back();
		}

		if (!$self->input_fields()->size()) {
			foreach ($self->get_fields()) {
				$self->input_fields()->add(
					name => $_, 
					ds_column => $self->input_fields()->size()+1,
					dataset => $self,
#?					target_ref => $p{target_ref},
					configuration => $self->configuration(),
				);
			}
		}
		my @flds = map($_->name(), $self->input_fields()->to_array());
		map($_ =~ s/^_//, @flds);
		$self->select_fields(join(', ', @flds));
		$self->select_order_by($p{key_field}) 
			if (exists($p{key_field}) && defined($p{key_field}));

		foreach ($derived->to_array()) {
			$self->input_fields()->push_back($_);
			$self->input_fields()->back()->field_number($self->input_fields()->size());
		}
	}
	sub get_fields
	{
		my $self = shift;
		use DBI;
		my $dbh = DBI->connect($self->dsn(), $self->user(), $self->password(), { RaiseError => 1, AutoCommit => 0 })
			|| die("Unable to connect to data source table '@{[ $self->dsn() ]}' for '@{[ $self->table_name() ]}': $DBI::errstr");
		my $sth = $dbh->prepare("select @{[ defined($self->select_override()) ? $self->select_override() : '*' ]} from @{[ $self->table_name() ]}");
		my @flds = (@{$sth->{NAME}});
		$sth->finish();
		$dbh->disconnect();
		return (@flds);
	}
	sub code_prepare
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		return $c;
	}
	sub code_open
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		$c->code("use DBI;");
		$c->code("my \$@{[ $self->datasource()->vname() ]}_dsn = '@{[ 
				defined($self->_dsn_parsed()) ? $self->_dsn_parsed()->raw() : $self->dsn()
			]}';");
		$c->code("my \$@{[ $self->datasource()->vname() ]}_user = '@{[ 
				defined($self->_user_parsed()) ? $self->_user_parsed()->raw() : $self->user()
			]}';");
		$c->code("my \$@{[ $self->datasource()->vname() ]}_password = '@{[ 
				defined($self->_password_parsed()) ? $self->_password_parsed()->raw() : $self->password()
			]}';");
		$c->code(
			"my \$@{[ 
				$self->dbh_vname() 
			]} = DBI->connect(\$@{[ 
				$self->datasource()->vname() 
			]}_dsn, \$@{[ 
				$self->datasource()->vname() 
			]}_user, \$@{[ 
				$self->datasource()->vname() 
			]}_password, { RaiseError => 1, AutoCommit => 0 })"
		);
		$c->over();
		$c->code("|| die(\"Unable to connect to data source table '@{[ 
				$self->datasource()->target_name() 
			]}' for '@{[ 
				$self->datasource()->vname() 
			]}': \$DBI::errstr\");");
		$c->back();
		$c->newline_off();
		$c->code("my \$@{[ $self->sth_vname() ]} = \$@{[ $self->dbh_vname() ]}->prepare(\"");
		if (defined($self->select_override())) {
			$c->code(defined($self->_select_override_parsed()) 
						? $self->_select_override_parsed()->raw() 
						: $self->select_override()
				);
		}
		else
		{
			$c->code("select ");
			$c->code(
					defined($self->_select_fields_parsed())
						? $self->_select_fields_parsed()->raw()
						: $self->select_fields()
					);
			$c->code(" from @{[ 
					defined($self->_table_name_parsed()) 
						? $self->_table_name_parsed()->raw() 
						: $self->table_name()
				]}");
			$c->code(" where @{[ 
					defined($self->_select_where_condition_parsed()) 
						? $self->_select_where_condition_parsed()->raw() 
						: $self->select_where_condition()
				]}") 
				if (defined($self->select_where_condition()));
			$c->code(" order by @{[ 
					defined($self->_select_order_by_parsed()) 
						? $self->_select_order_by_parsed()->raw() 
						: $self->select_order_by()
				]}") 
				if (defined($self->select_order_by()));
			$c->code(" group by @{[ 
					defined($self->_select_group_by_parsed()) 
						? $self->_select_group_by_parsed()->raw() 
						: $self->select_group_by()
				]}") 
				if (defined($self->select_group_by()));
		}
		$c->newline_on();
		$c->code("\");");
		$c->code("\$@{[ $self->sth_vname() ]}->execute() || die(\"Can't execute statement: \$DBI::errstr\");");
		return $c;
	}
	sub code_close
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();
		$c->code("\$@{[ $self->sth_vname() ]}->finish();");
		$c->code("\$@{[ $self->dbh_vname() ]}->disconnect();");
		return $c;
	}
	sub code_decode
	{
		my $self = shift;
		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -