Class: AwDatapipe::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/aw_datapipe/session.rb

Instance Method Summary collapse

Instance Method Details

#awsObject



5
6
7
# File 'lib/aw_datapipe/session.rb', line 5

def aws
  @aws ||= Aws::DataPipeline::Client.new
end

#create(name, description, unique_id = SecureRandom.uuid) ⇒ Object

name [String] required description [String] (optional) unique_id [String] (default: uuid)



16
17
18
19
# File 'lib/aw_datapipe/session.rb', line 16

def create(name, description, unique_id = SecureRandom.uuid)
  resp = aws.create_pipeline(name: name, unique_id: unique_id)
  resp.pipeline_id
end

#download_definition(id, dir) ⇒ Object



9
10
11
# File 'lib/aw_datapipe/session.rb', line 9

def download_definition(id, dir)
  fetch(id).write_source(dir)
end

#fetch(key) ⇒ Object



29
30
31
32
# File 'lib/aw_datapipe/session.rb', line 29

def fetch(key)
  resp = aws.get_pipeline_definition(pipeline_id: key)
  serializer.unmarshal(resp).tap { |p| p.id = key }
end

#keysObject



21
22
23
24
25
26
27
# File 'lib/aw_datapipe/session.rb', line 21

def keys
  @keys ||= begin
    resp = aws.list_pipelines
    id_names = resp.pipeline_id_list
    id_names.map(&:id)
  end
end

#save(pipeline) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/aw_datapipe/session.rb', line 38

def save(pipeline)
  resp = aws.put_pipeline_definition(serializer.marshal(pipeline))
  if resp.errored
    resp.validation_errors.each do |error|
      puts "Error in #{error.id}: #{error.errors.inspect}"
    end
  end
  resp.validation_warnings.each do |warning|
    puts "Warning in #{warning.id}: #{warning.warnings.inspect}"
  end
  !resp.errored
end

#serializerObject



34
35
36
# File 'lib/aw_datapipe/session.rb', line 34

def serializer
  @serializer = PipelineSerializer.new
end