Class: Remi::Job::Transform

Inherits:
Object
  • Object
show all
Defined in:
lib/remi/job/transform.rb

Overview

A Transform contains a block of code that is executed in a context. Transforms are usually defined in a Job, according to the Job DSL.

Transforms may optionally have a mapping defined that links a local definition of a data frame to a definition of the data frame in the associated context.

Examples:


# Transforms should typically be defined using the Job DSL
job = MyJob.new
tform = Job::Transform.new(job) do
  # ... stuff to do in the context of the job
end
tform.execute

Defined Under Namespace

Classes: FieldMap

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context, name: 'NOT DEFINED', **kargs, &block) ⇒ Transform

Initializes a transform

Parameters:

  • context (Object, Job)

    sets the context in which the block will be executed

  • name (String, Symbol) (defaults to: 'NOT DEFINED')

    optionally gives the transform a name

  • kargs (Hash)

    any keyword arguments are accessable within the block as #params (e.g., params[:my_custom_param])

  • block (Proc)

    a block of code to execute in the context



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/remi/job/transform.rb', line 27

def initialize(context, name: 'NOT DEFINED', **kargs, &block)
  @context = context
  @name = name
  @block = block
  params.merge! kargs

  @sources = []
  @targets = []

  @field_maps = { sources: {}, targets: {} }
end

Instance Attribute Details

#contextObject

Returns the value of attribute context.



39
40
41
# File 'lib/remi/job/transform.rb', line 39

def context
  @context
end

#field_mapsObject

Returns the value of attribute field_maps.



39
40
41
# File 'lib/remi/job/transform.rb', line 39

def field_maps
  @field_maps
end

#nameObject

Returns the value of attribute name.



39
40
41
# File 'lib/remi/job/transform.rb', line 39

def name
  @name
end

#sourcesObject

Returns the value of attribute sources.



39
40
41
# File 'lib/remi/job/transform.rb', line 39

def sources
  @sources
end

#targetsObject

Returns the value of attribute targets.



39
40
41
# File 'lib/remi/job/transform.rb', line 39

def targets
  @targets
end

Instance Method Details

#executeObject

Executes the transform block

Returns:

  • (Object)

    the context of the transform after executing



43
44
45
46
# File 'lib/remi/job/transform.rb', line 43

def execute
  context.logger.info "Running transformation #{@name}"
  Dsl.dsl_eval(self, @context, &@block)
end

#import(sub_transform, **kargs, &block) ⇒ Object

Imports another transform to be executed as part of this transform. The block is used to perform any source/target field mapping.

Examples:


sub_transform = Job::Transform.new('arbitrary') do
  source :sub_transform_source, [] # validate that this source has been defined
  # do stuff to sub_transform_source here
end

job = MyJob.new
my_transform = Job::Transform.new(job) do
  import sub_transform do
    map_source_fields :some_method_in_my_job, :sub_sub_transform_source, { :job_id => :sub_transform_id }
  end
end

Parameters:

  • sub_transform (Job::Transform)

    the transform to import into this one

  • block (Proc)

    a block of code to be executed prior to the execution of the imported transform. This is where field mapping would be defined.



118
119
120
121
122
123
124
125
126
# File 'lib/remi/job/transform.rb', line 118

def import(sub_transform, **kargs, &block)
  sub_transform.context = context
  sub_transform.params.merge! kargs
  Dsl.dsl_eval(sub_transform, context, &block)

  sub_transform.map_inputs
  sub_transform.execute
  sub_transform.map_outputs
end

#map_source_fields(from_source, to_source, field_map) ⇒ Object

Maps data sources and fields from the transform context to the local transform

Parameters:

  • from_source (Symbol)

    name of the source data in the context

  • to_source (Symbol)

    name of the source data local to the transform

  • field_map (Hash)

    mapping of the key names from the context source to the local source



75
76
77
78
79
80
81
82
83
# File 'lib/remi/job/transform.rb', line 75

def map_source_fields(from_source, to_source, field_map)
  sources << to_source unless sources.include? to_source

  job_ds = context.send(from_source)
  sub_trans_ds = Remi::DataSubject.new(name: to_source)
  define_singleton_method(to_source) { sub_trans_ds }

  field_maps[:sources][to_source] = FieldMap.new(job_ds, send(to_source), field_map)
end

#map_target_fields(from_target, to_target, field_map) ⇒ Object

Maps data targets and fields from the local tarnsform to the transform context

Parameters:

  • from_target (Symbol)

    name of the target data local to the transform

  • to_target (Symbol)

    name of the target data in the context

  • field_map (Hash)

    mapping of the key names from the local transform target to the context target



89
90
91
92
93
94
95
96
97
# File 'lib/remi/job/transform.rb', line 89

def map_target_fields(from_target, to_target, field_map)
  targets << from_target unless targets.include? from_target

  job_ds = context.send(to_target)
  sub_trans_ds = Remi::DataSubject.new
  define_singleton_method(from_target) { sub_trans_ds }

  field_maps[:targets][from_target] = FieldMap.new(send(from_target), job_ds, field_map)
end

#paramsHash

Returns the parameters defined during initialization of the transform.

Returns:

  • (Hash)

    the parameters defined during initialization of the transform



49
50
51
# File 'lib/remi/job/transform.rb', line 49

def params
  @params ||= Hash.new { |_, key| raise ArgumentError, "Transform parameter #{key} is not defined" }
end

#source(name, fields) ⇒ Object

Validates that a data source used in the transform has been defined

Parameters:

  • name (Symbol)

    the name of a data source used in the transform

  • fields (Array<Symbol>)

    a list of fields used by the transform for this data source

Raises:

  • (ArgumentError)

    if the transform source is not defined



57
58
59
60
# File 'lib/remi/job/transform.rb', line 57

def source(name, fields)
  raise NoMethodError, "Need to define a source mapping for #{name}" unless sources.include? name
  raise ArgumentError, "Need to map fields to source #{name} (#{fields})" unless (fields - field_maps[:sources][name].field_from_to.values).empty?
end

#target(name, fields) ⇒ Object

Validates that a data target used in the transform has been defined

Parameters:

  • name (Symbol)

    the name of a data target used in the transform

  • fields (Array<Symbol>)

    a list of fields used by the transform for this data target

Raises:

  • (ArgumentError)

    if the transform target is not defined



66
67
68
69
# File 'lib/remi/job/transform.rb', line 66

def target(name, fields)
  raise NoMethodError, "Need to define a target mapping for #{name}" unless targets.include? name
  raise ArgumentError, "Need to map fields to target #{name} (#{fields})" unless (fields - field_maps[:targets][name].field_from_to.keys).empty?
end