Class: Remi::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/remi/job.rb,
lib/remi/job/sub_job.rb,
lib/remi/job/transform.rb,
lib/remi/job/parameters.rb

Overview

TODO:

MOAR Examples! Subtransforms, subjobs, parameters, references to even more complete sample jobs.

The Job class is the foundation for all Remi ETL jobs. It provides a DSL for defining Remi jobs in a way that is natural for ETL style applications. In a Remi job, the user defines all of the sources, transforms, and targets necessary to transform data. Any number of sources, transforms, and targets can be defined. Transforms can call other parameterized sub-transforms. Jobs can collect data from other parameterized sub-jobs, pass data to other sub-jobs, or both pass and collect data from other sub-jobs.

Jobs are executed by calling the #execute method in an instance of the job. This triggers all transforms to be executed in the order they are defined. Sub-transforms are only executed if they are referenced in a transform. After all transforms have executed, the targets are loaded in the order they are defined.

Examples:


class MyJob < Remi::Job
  source :my_csv_file do
    extractor my_extractor
    parser my_parser
    enforce_types
  end

  target :my_transformed_file do
    loader my_loader
  end

  transform :transform_data do
    # Data sources are converted into a dataframe the first time the #df method is called.
    transform_work = my_csv_file.df.dup # => a copy of the my_csv_file.df dataframe

    # Any arbitrary Ruby is allowed in a transform block.  Remi provides a convenient
    # source to target map DSL to map fields from sources to targets
    Remi::SourceToTargetMap.apply(transform_work, my_transformed_file.df) do
      map source(:source_field_id) .target(:prefixed_id)
        .transform(->(v) { "PREFIX#{v}" })
    end
  end
end

# The job is executed when `#execute` is called on an instance of the job.
# Transforms are executed in the order they are defined.  Targets are loaded
# in the order they are defined after all transforms have been executed.
job = MyJob.new
job.execute

Defined Under Namespace

Classes: Parameters, SubJob, Transform

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(work_dir: Settings.work_dir, logger: Settings.logger, **kargs) ⇒ Job

Initializes the job

Parameters:

  • work_dir (String, Path) (defaults to: Settings.work_dir)

    sets the working directory for this job

  • logger (Object) (defaults to: Settings.logger)

    sets the logger for the job

  • kargs (Hash)

    Optional job parameters (can be referenced in the job via #params)



219
220
221
222
223
224
225
226
227
228
229
# File 'lib/remi/job.rb', line 219

def initialize(work_dir: Settings.work_dir, logger: Settings.logger, **kargs)
  @work_dir = work_dir
  @logger = logger
  create_work_dir

  __init_params__ **kargs
  __init_sub_jobs__
  __init_sources__
  __init_targets__
  __init_transforms__
end

Instance Attribute Details

#loggerObject (readonly)

Returns the logging object.

Returns:

  • (Object)

    the logging object



235
236
237
# File 'lib/remi/job.rb', line 235

def logger
  @logger
end

#paramsJob::Parameters (readonly)

Returns parameters defined at the class level or during instantiation.

Returns:

  • (Job::Parameters)

    parameters defined at the class level or during instantiation



238
239
240
# File 'lib/remi/job.rb', line 238

def params
  @params
end

#sourcesArray (readonly)

Returns list of sources defined in the job.

Returns:

  • (Array)

    list of sources defined in the job



244
245
246
# File 'lib/remi/job.rb', line 244

def sources
  @sources
end

#sub_jobsArray (readonly)

Returns list of sub_jobs defined in the job.

Returns:

  • (Array)

    list of sub_jobs defined in the job



241
242
243
# File 'lib/remi/job.rb', line 241

def sub_jobs
  @sub_jobs
end

#targetsArray (readonly)

Returns list of targets defined in the job.

Returns:

  • (Array)

    list of targets defined in the job



247
248
249
# File 'lib/remi/job.rb', line 247

def targets
  @targets
end

#transformsArray (readonly)

Returns list of transforms defined in the job.

Returns:

  • (Array)

    list of transforms defined in the job



250
251
252
# File 'lib/remi/job.rb', line 250

def transforms
  @transforms
end

#work_dirString (readonly)

Returns the working directory used for temporary data.

Returns:

  • (String)

    the working directory used for temporary data



232
233
234
# File 'lib/remi/job.rb', line 232

def work_dir
  @work_dir
end

Class Method Details

.param(name, &block) ⇒ Object

Defines a job parameter.

Examples:


class MyJob < Job
  param(:my_param) { 'the best parameter' }
end

job = MyJob.new
job.params[:my_param] #=> 'the best parameter'


81
82
83
# File 'lib/remi/job.rb', line 81

def param(name, &block)
  params.__define__(name, &block)
end

.paramsJob::Parameters

Returns all parameters defined at the class level.

Returns:



68
69
70
# File 'lib/remi/job.rb', line 68

def params
  @params ||= Parameters.new
end

.source(name, &block) ⇒ Object

Defines a data source.

Examples:


class MyJob < Job
  source :my_source do
    extractor my_extractor
    parser my_parser
  end
end

job = MyJob.new
job.my_source.df #=> a dataframe generated after extracting and parsing


128
129
130
131
132
133
134
135
136
# File 'lib/remi/job.rb', line 128

def source(name, &block)
  sources << name unless sources.include? name
  attr_accessor name

  define_method("__init_#{name}__".to_sym) do
    source = DataSource.new(self, name: name, &block)
    instance_variable_set("@#{name}", source)
  end
end

.sourcesArray<Symbol>

Returns the list of data source names.

Returns:

  • (Array<Symbol>)

    the list of data source names



86
87
88
# File 'lib/remi/job.rb', line 86

def sources
  @sources ||= []
end

.sub_job(name, &block) ⇒ Object

Defines a sub job resource for this job. Note that the return value of the DSL block must be an instance of a Remi::Job

Examples:


class MyJob < Job
  sub_job(:my_sub_job) { MySubJob.new }
end

job = MyJob.new
job.sub_job.job #=> An instance of MySubJob


106
107
108
109
110
111
112
113
114
# File 'lib/remi/job.rb', line 106

def sub_job(name, &block)
  sub_jobs << name unless sub_jobs.include? name
  attr_accessor name

  define_method("__init_#{name}__".to_sym) do
    sub_job = Job::SubJob.new(self, name: name, &block)
    instance_variable_set("@#{name}", sub_job)
  end
end

.sub_jobsArray<Symbol>

Returns the list of sub-jobs.

Returns:

  • (Array<Symbol>)

    the list of sub-jobs



92
93
94
# File 'lib/remi/job.rb', line 92

def sub_jobs
  @sub_jobs ||= []
end

.sub_transform(name, **kargs, &block) ⇒ Object

Defines a sub-transform.

Examples:


class MyJob < Job
  sub_transform :my_sub_transform, greeting: 'hello' do
    puts "#{params[:greeting]} from my_sub_transform!"
  end

  transform :my_transform do
    import :my_sub_transform, greeting: 'bonjour' do
    end
  end
end

job = MyJob.new
job.my_transform.execute #=>(stdout) 'bonjour from my_sub_transform!'


207
208
209
210
211
# File 'lib/remi/job.rb', line 207

def sub_transform(name, **kargs, &block)
  define_method(name) do
    Transform.new(self, name: name, **kargs, &block)
  end
end

.target(name, &block) ⇒ Object

Defines a data target.

Examples:


class MyJob < Job
  target :my_target do
    extractor my_extractor
    parser my_parser
  end
end

job = MyJob.new
job.my_target.df #=> a dataframe generated after extracting and parsing


155
156
157
158
159
160
161
162
163
# File 'lib/remi/job.rb', line 155

def target(name, &block)
  targets << name unless targets.include? name
  attr_accessor name

  define_method("__init_#{name}__".to_sym) do
    target = DataTarget.new(self, name: name, &block)
    instance_variable_set("@#{name}", target)
  end
end

.targetsArray<Symbol>

Returns the list of data target names.

Returns:

  • (Array<Symbol>)

    the list of data target names



139
140
141
# File 'lib/remi/job.rb', line 139

def targets
  @targets ||= []
end

.transform(name, &block) ⇒ Object

Defines a transform.

Examples:


class MyJob < Job
  transform :my_transform do
    puts "hello from my_transform!"
  end
end

job = MyJob.new
job.my_transform.execute #=>(stdout) 'hello from my_transform!'


181
182
183
184
185
186
187
188
189
# File 'lib/remi/job.rb', line 181

def transform(name, &block)
  transforms << name unless transforms.include? name
  attr_accessor name

  define_method("__init_#{name}__".to_sym) do
    transform = Transform.new(self, name: name, &block)
    instance_variable_set("@#{name}", transform)
  end
end

.transformsArray<Symbol>

Returns the list of transform names.

Returns:

  • (Array<Symbol>)

    the list of transform names



166
167
168
# File 'lib/remi/job.rb', line 166

def transforms
  @transforms ||= []
end

Instance Method Details

#create_work_dirObject

Creates a temporary working directory for the job



254
255
256
257
# File 'lib/remi/job.rb', line 254

def create_work_dir
  @logger.info "Creating working directory #{work_dir}"
  FileUtils.mkdir_p work_dir
end

#execute(*components) ⇒ self

Execute the specified components of the job.

Parameters:

  • components (Array<symbol>)

    list of components to execute (e.g., :transforms, :load_targets)

Returns:

  • (self)


284
285
286
287
288
289
# File 'lib/remi/job.rb', line 284

def execute(*components)
  execute_transforms if components.empty? || components.include?(:transforms)
  execute_sub_jobs if components.empty? || components.include?(:sub_jobs)
  execute_load_targets if components.empty? || components.include?(:load_targets)
  self
end

#inspectObject



269
270
271
272
273
274
275
276
# File 'lib/remi/job.rb', line 269

def inspect
  "#<#{Remi::Job}>: #{self.class}\n" +
    "  parameters: #{params.to_h.keys}\n" +
    "  sources: #{sources}\n" +
    "  targets: #{targets}\n" +
    "  transforms: #{transforms}\n" +
    "  sub_jobs: #{sub_jobs}"
end

#jobself

Returns the job object (needed to reference parent job in transform DSL).

Returns:

  • (self)

    the job object (needed to reference parent job in transform DSL)



261
262
263
# File 'lib/remi/job.rb', line 261

def job
  self
end

#to_sObject



265
266
267
# File 'lib/remi/job.rb', line 265

def to_s
  inspect
end