⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pequel.pm

📁 普通的ETL工具
💻 PM
字号:
# vim:ts=4 sw=4# ----------------------------------------------------------------------------------------------------#  Name		: ETL::Pequel3::Type::DataSource::Pequel.pm#  Created	: 5 January 2007#  Author	: Mario Gaffiero (gaffie)## Copyright 1999-2007 Mario Gaffiero.# # This file is part of Pequel(TM).# # Pequel is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; version 2 of the License.# # Pequel is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with Pequel; if not, write to the Free Software# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA# ----------------------------------------------------------------------------------------------------# Modification History# When          Version     Who     What# ----------------------------------------------------------------------------------------------------package ETL::Pequel3::Type::DataSource::Pequel;require 5.005_62;use strict;use warnings;# ----------------------------------------------------------------------------------------------------{	package ETL::Pequel3::Type::DataSource::Pequel::Abstract;	use base qw(Class::STL::Element);	use Class::STL::ClassMembers;	use Class::STL::ClassMembers::Constructor;	sub _map_interface	{		my $self = shift;		my $target_fields = shift;		my $source_fields = shift;		my $source_transfer_fields = shift;		#NOTE: check for interface mismatch:		foreach my $f (grep(!defined($_->calc_exp()), $target_fields->to_array())) 		{			$self->err()->user_error(10207, 				$self->target_name() 				. ": I/O interface mismatch for target field '"				. $target_fields->pequel_ref()->pequel_name() . '->' . $f->name() 				. "'!\nSource "				. $source_fields->pequel_ref()->pequel_name()				. ":"				. join(', ', 					map					(						$_->name(), 						(							do { defined($source_transfer_fields) ? $source_transfer_fields->to_array() : (); },							$source_fields->to_array()						)					)				)				. "\nTarget "				. $target_fields->pequel_ref()->pequel_name()				. ":"				. join(', ', map($_->name, grep(!defined($_->calc_exp()), $target_fields->to_array())))			)			unless 			(				$source_fields->exists($f->name()) 				|| 				(					defined($source_transfer_fields) 					&& $source_transfer_fields->exists($f->name())				)			);		}		#NOTE: preserve target derived fields;		my $derived = stl::list();		map 		( 			$derived->push_back($_), 			grep($_->can('calc_exp') && defined($_->calc_exp()), $target_fields->to_array()) 		);		$target_fields->clear();		foreach 		(			grep			(				$_->name() !~ /^_/, 				(					$source_fields->to_array(), 					do{ defined($source_transfer_fields) ? $source_transfer_fields->to_array() : () }				)			)		) 		{			$target_fields->add(				name => $_->name(), 				pequel_type => defined($_->pequel_type()) 					? $_->pequel_type() : $self->pequel_types()->exists('string'),			);			$target_fields->back()->xref()->references($_);			$_->xref()->referenced_by($target_fields->back());		}		foreach ($derived->to_array()) {			$target_fields->push_back($_);			$target_fields->back()->field_number($target_fields->size());		}	}}# ----------------------------------------------------------------------------------------------------{	package ETL::Pequel3::Type::DataSource::Input::Pequel;	use base qw(ETL::Pequel3::Type::DataSource::Input::Abstract);	use base qw(ETL::Pequel3::Type::DataSource::Pequel::Abstract);	use Class::STL::ClassMembers qw( pequel_ref ),		ETL::Pequel3::Type::DataMember::User->new(name => 'target_name'),		Class::STL::ClassMembers::DataMember->new(name => 'datasource_name', default => 'pequel');	use Class::STL::ClassMembers::Constructor;	sub _map_output	{		#TODO: test all variations		my $self = shift;		my %p = @_;		$self->err()->user_error(10501, 			"@{[ __PACKAGE__ ]}->_map_output() required argument 'main' undefined!")			unless (exists($p{main}) && defined($p{main}) && $p{main}->isa('ETL::Pequel3::User'));			my $sub = $p{main}->exists($self->target_name())			|| $self->err()->user_error(10504, "@{[ $p{main}->pequel_name() ]}: Cannot find input-dataset sub-pequel @{[ 				$self->target_name() ]}");		$self->pequel_ref($sub);		$self->dataset()->discard_header(0) if ($self->dataset()->can('discard_header'));		$self->pequel_ref()->output()->output_dataset()->print_header(0) 			if 			(				$self->pequel_ref()->user_sections()->output()->size()				&&				$self->pequel_ref()->output()->output_dataset()->can('print_header')			);			if ($self->dataset()->input_fields()->size() || !defined($p{field_map}) || $p{field_map} eq '')		{			$self->err()->trace_msg(10, "@{[ $self->dataset()->pequel_ref()->pequel_name() 				]}::input <--map_output: @{[ $self->pequel_ref()->pequel_name() ]}::output");			$self->ETL::Pequel3::Type::DataSource::Pequel::Abstract::_map_interface(				$self->dataset()->input_fields(), 				$self->pequel_ref()->output()->output_fields(), 				$self->pequel_ref()->output()->transfer_fields()			);		}		else # self->dataset()->input_fields()->size() == 0 && field_map is defined:		{			my %field_map;			foreach my $fname (split(/\s*,\s*/, $p{field_map})) 			{				$fname =~ s/:.*$//; # discard irrelevant field-number;				$field_map{$fname} = $fname;				$self->err()->user_error(10216, "Invalid field-map in table '@{[ 							$self->name() 						]}' --> Field '$fname' does not exist in sub-pequel '@{[ 							$self->pequel_ref()->pequel_name() 						]}'")					unless (grep($fname eq $_->name(), $self->pequel_ref()->output_interface()));			}			# Copy output-field from sub-pequel to table fields:			my $fnum=0;			foreach ($self->pequel_ref()->output_interface())			{				$fnum++;				next if (defined($p{field_map}) && !exists($field_map{$_->name()}));				$self->dataset()->input_fields()->add(					name => $_->name(), 					ds_column => $fnum,				);				$self->dataset()->input_fields()->back()->xref()->references($_);				$_->xref()->referenced_by($self->dataset()->input_fields()->back());			}		}		$self->pequel_ref()->output()->output_dataset()->datasource()->sorter($p{sorter}) 			if (defined($p{sorter}));#?			if (defined($self->sorter())); #TODO	}	sub _code_open	{		my $self = shift;		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();		$c->code("my \$_PID_@{[ $self->vname() ]};");		$c->code("if ((\$_PID_@{[ $self->vname() ]} = open(@{[ $self->fdname() ]}, '-|')) == 0)");		$c->code("# Fork -- read input from child pequel '@{[ $self->target_name() ]}'");		$c->open_block();		$c->code("\&Pequel\::@{[ $self->target_name() ]}\::execute();");		$c->code("exit(0);");		$c->close_block();		$c->code("die(\"Error forking to pequel script '@{[ $self->target_name() ]}'!\")");		$c->code("unless (defined(\$_PID_@{[ $self->vname() ]}));");		return $c;	}	sub _code_close	{		my $self = shift;		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();		$c->code("close(@{[ $self->fdname() ]});");		return $c;	}}# ----------------------------------------------------------------------------------------------------{	package ETL::Pequel3::Type::DataSource::Output::Pequel;	use base qw(ETL::Pequel3::Type::DataSource::Output::Abstract);	use base qw(ETL::Pequel3::Type::DataSource::Pequel::Abstract);	use Class::STL::ClassMembers qw( pequel_ref ),		ETL::Pequel3::Type::DataMember::User->new(name => 'target_name'),		Class::STL::ClassMembers::DataMember->new(name => 'datasource_name', default => 'pequel');	use Class::STL::ClassMembers::Constructor;	sub _map_output	{		my $self = shift;		my %p = @_;		$self->err()->user_error(10500, 			"@{[ __PACKAGE__ ]}->_map_output() required argument 'on' undefined!")			unless (exists($p{on}) && defined($p{on}));		$self->err()->user_error(10501, 			"@{[ __PACKAGE__ ]}->_map_output() required argument 'main' undefined!")			unless (exists($p{main}) && defined($p{main}) && $p{main}->isa('ETL::Pequel3::User'));		my $sub = $p{main}->exists($self->target_name())			|| $self->err()->user_error(10504, "@{[ $p{main}->pequel_name() ]}: Cannot find output-dataset sub-pequel @{[ 				$self->target_name() ]}");		$self->pequel_ref($sub);		$self->dataset()->print_header(0) if ($self->dataset()->can('print_header'));		$self->pequel_ref()->input()->input_dataset()->discard_header(0) 			if ($self->pequel_ref()->input()->input_dataset()->can('discard_header'));			my $save_sorter = $self->pequel_ref()->input()->input_dataset()->datasource()->sorter();		$self->pequel_ref()->input()->input_dataset($p{main}->factories()->input_datasets()->factory( 			pequel_ref => $self->pequel_ref()		));		$self->pequel_ref()->input()->input_dataset()->datasource()->sorter($save_sorter->clone()) 			if (defined($save_sorter) && !$self->pequel_ref()->config()->hash());		$self->err()->trace_msg(10, "@{[ $self->pequel_ref()->pequel_name() 			]}::output -->map_output: @{[ $p{main}->pequel_name() ]}::@{[ $p{on} ]}");		$self->ETL::Pequel3::Type::DataSource::Pequel::Abstract::_map_interface		(			$self->pequel_ref()->input()->input_fields(), 			(				$p{on} eq 'input' 					? ( $p{main}->input()->input_fields() ) 					: ( $p{main}->output()->output_fields(), $p{main}->output()->transfer_fields() )			)		);		if ($self->pequel_ref()->config()->transfer()) 		{			$self->pequel_ref()->output()->transfer_fields()->clear();			$self->pequel_ref()->output()->transfer_fields()->map_input_fields(				$self->pequel_ref()->input()->input_fields(), 				$self->pequel_ref()->input()->input_dataset(),				($self->pequel_ref()->user_sections()->group_by()->size() 					? $self->pequel_ref()->config()->transfer() : 'last')			);		}		if (defined($self->pequel_ref()->input()->input_dataset()->datasource()->sorter()) 			&& defined($self->sorter()) 			&& $p{main}->config()->hash() == 0) 		{			if 			(				join('|', map($_->name(), $self->pequel_ref()->input()->input_dataset()->datasource()->sorter()->fields()->to_array()))				eq				join('|', map($_->name(), $self->sorter()->fields()->to_array()))			)			{				$self->pequel_ref()->input()->input_dataset()->datasource()->undefine(qw(sorter));				$self->err()->trace_msg(10, "-->"					. $self->target_name()					. " -- removed redundant input-sort"				);			}		}	}	sub _code_open	{		my $self = shift;		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();		$c->code("my \$_PID_@{[ $self->vname() ]};");		$c->code("if ((\$_PID_@{[ $self->vname() ]} = open(@{[ $self->fdname() ]}, '|-')) == 0)");		$c->code("# Fork -- write output to child pequel '@{[ $self->target_name() ]}'");		$c->open_block();		$c->code("\&Pequel\::@{[ $self->target_name() ]}\::execute();");		$c->code("exit(0);");		$c->close_block();		$c->code("die(\"Error forking to pequel script '@{[ $self->target_name() ]}'!\")");		$c->code("unless (defined(\$_PID_@{[ $self->vname() ]}));");		return $c;	}	sub _code_close	{		my $self = shift;		my $c = shift || ETL::Pequel3::CodeStyler::Program::Perl->new();		$c->code("close(@{[ $self->fdname() ]});");		return $c;	}}# ----------------------------------------------------------------------------------------------------1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -