Class: AwDatapipe::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/aw_datapipe/pipeline.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(objects, parameter_metadata, parameter_values) {|_self| ... } ⇒ Pipeline

objects [Array]

Yields:

  • (_self)

Yield Parameters:



11
12
13
14
15
16
# File 'lib/aw_datapipe/pipeline.rb', line 11

def initialize(objects, , parameter_values)
  @objects = ObjectHash.new
  append_objects_with_dependencies(objects)
  @parameter_metadata, @parameter_values = , parameter_values
  yield(self) if block_given?
end

Instance Attribute Details

#idObject

AWS pipeline id



4
5
6
# File 'lib/aw_datapipe/pipeline.rb', line 4

def id
  @id
end

#objectsObject (readonly)

ObjectHash[:id => PipelineObject]



6
7
8
# File 'lib/aw_datapipe/pipeline.rb', line 6

def objects
  @objects
end

#parameter_metadataObject (readonly)

Hash[‘parameterName’ => Hash



7
8
9
# File 'lib/aw_datapipe/pipeline.rb', line 7

def 
  @parameter_metadata
end

#parameter_valuesObject (readonly)

Hash[‘parameterName’ => “value”]



8
9
10
# File 'lib/aw_datapipe/pipeline.rb', line 8

def parameter_values
  @parameter_values
end

#uuidObject

Unique id



5
6
7
# File 'lib/aw_datapipe/pipeline.rb', line 5

def uuid
  @uuid
end

Class Method Details

.build(config, activities, parameter_metadata, parameter_values) ⇒ Object



18
19
20
21
22
# File 'lib/aw_datapipe/pipeline.rb', line 18

def self.build(config, activities, , parameter_values)
  new([config], , parameter_values) do |pipeline|
    pipeline.append_objects_with_dependencies(activities)
  end
end

Instance Method Details

#append_object(object) ⇒ PipelineObject

Returns appended object.

Returns:



25
26
27
28
# File 'lib/aw_datapipe/pipeline.rb', line 25

def append_object(object)
  object.pipeline = self
  objects[object.id] = object
end

#append_object_with_dependencies(object) ⇒ Object



30
31
32
# File 'lib/aw_datapipe/pipeline.rb', line 30

def append_object_with_dependencies(object)
  [*object.dependencies, object].each(&method(:append_object))
end

#append_objects_with_dependencies(objects) ⇒ Pipeline

Returns self.

Returns:



35
36
37
38
# File 'lib/aw_datapipe/pipeline.rb', line 35

def append_objects_with_dependencies(objects)
  objects.each(&method(:append_object_with_dependencies))
  self
end

#configurationObject



40
41
42
# File 'lib/aw_datapipe/pipeline.rb', line 40

def configuration
  objects.fetch(:default)
end

#csv_data_format(params) ⇒ Object



44
45
46
# File 'lib/aw_datapipe/pipeline.rb', line 44

def csv_data_format(params)
  append_object CsvDataFormat.build(params)
end

#ec2_resource(params) ⇒ Ec2Resource

Parameters:

  • params (Hash)

    Various required and optional parameters.

Options Hash (params):

  • :name (String) — default: default: 'Ec2Instance'
  • :instance_type (String) — default: default: 't1.micro'
  • :subnet_id (String)
  • :security_group_ids (String)
  • :action_on_task_failure (String) — default: default: 'terminate'
  • :terminate_after (String) — default: default: '2 Hours'

Returns:



57
58
59
60
# File 'lib/aw_datapipe/pipeline.rb', line 57

def ec2_resource(params)
  append_object Ec2Resource.build(
    { name: 'Ec2Instance', instance_type: 't1.micro', action_on_task_failure: 'terminate', terminate_after: '2 Hours' }.merge(params))
end

#jdbc_database(params) ⇒ Object



62
63
64
# File 'lib/aw_datapipe/pipeline.rb', line 62

def jdbc_database(params)
  append_object JdbcDatabase.build(params)
end

#referenced_object_idsObject



70
71
72
# File 'lib/aw_datapipe/pipeline.rb', line 70

def referenced_object_ids
  referenced_objects.map(&:id) << :default
end

#referenced_objectsArray

Collect dependencies for all objects, removing duplicates.

Returns:

  • (Array)

    referenced objects, with dependees before dependents.



76
77
78
# File 'lib/aw_datapipe/pipeline.rb', line 76

def referenced_objects
  objects.values.map(&:dependencies).flatten.uniq
end

#s3_data_node(params) ⇒ Object



66
67
68
# File 'lib/aw_datapipe/pipeline.rb', line 66

def s3_data_node(params)
  append_object S3DataNode.build(params)
end

#write_source(pathname) ⇒ Object



80
81
82
# File 'lib/aw_datapipe/pipeline.rb', line 80

def write_source(pathname)
  SourceWriter.call(self, pathname)
end