Class: InstDataShipper::Dumper

Inherits:
Object
  • Object
show all
Includes:
Hooks
Defined in:
lib/inst_data_shipper/dumper.rb

Direct Known Subclasses

BasicDumper

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Hooks

#run_hook, #run_hook_safe

Class Method Details

.current(executor: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/inst_data_shipper/dumper.rb', line 32

def self.current(executor: nil)
  cur_batch = Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY]
  ctx = cur_batch&.context || {}
  return nil unless ctx[:origin_class].present? && ctx[:tracker_id].present?

  clazz = ctx[:origin_class]
  clazz = clazz.constantize if clazz.is_a?(String)
  clazz.new(executor: executor)
end

.define(include: [], schema:, &blk) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/inst_data_shipper/dumper.rb', line 17

def self.define(include: [], schema: , &blk)
  Class.new(self) do
    include(*include)

    if blk.nil? && schema[:tables].any? { |t| t[:sourcer].present? }
      blk = -> { auto_enqueue_from_schema }
    elsif blk.nil?
      raise ArgumentError, "Must provide a block or a schema with source definitions"
    end

    define_method(:enqueue_tasks, &blk)
    define_method(:schema) { schema }
  end
end

.perform_dump(destinations) ⇒ Object



8
9
10
11
12
13
14
15
# File 'lib/inst_data_shipper/dumper.rb', line 8

def self.perform_dump(destinations)
  raise "Must subclass Dumper to use perform_dump" if self == Dumper

  dumper = new(destinations)
  dumper.begin_dump

  dumper.tracker
end

Instance Method Details

#begin_dumpObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/inst_data_shipper/dumper.rb', line 54

def begin_dump
  raise "Dump already begun" unless @raw_destinations.present?

  @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, genre: export_genre, status: 'in_progress')

  @batch_context = context = {
    # TODO Consider behavior if last is still running
    incremental_since: last_successful_tracker&.created_at,
  }

  destinations.each do |dest|
    dest.preinitialize_dump(context)
  end

  begin
    begin
      destinations.each do |dest|
        dest.initialize_dump(context)
      end

      run_hook(:initialize_dump_batch, context)
    ensure
      @batch_context = nil
      context[:tracker_id] = tracker.id
      context[:origin_class] = batch_context[:origin_class] || self.class.to_s
      context[:destinations] = @raw_destinations
    end

    Sidekiq::Batch.new.tap do |batch|
      context[:root_bid] = batch.bid
      tracker.update(batch_id: batch.bid)

      batch.description = "HD #{export_genre} Export #{tracker.id} Root"
      batch.context = context
      batch.on(:success, "#{self.class}#finalize_dump")
      batch.on(:death, "#{self.class}#cleanup_fatal_error!")
      batch.jobs do
        enqueue_tasks
      rescue => ex
        delayed :cleanup_fatal_error!
        InstDataShipper.handle_suppressed_error(ex)
        tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n"))
      end
    end
  rescue => ex
    if context
      batch ||= Sidekiq::Batch.new.tap do |batch|
        batch.description = "HD #{export_genre} Export #{tracker.id} Early Failure Cleanup"
        batch.context = context
        batch.jobs do
          delayed :cleanup_fatal_error!
        end
      end
    end
    tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n"))
    raise ex
  end
end

#export_genreObject



121
122
123
# File 'lib/inst_data_shipper/dumper.rb', line 121

def export_genre
  self.class.to_s
end

#for_specs!Object



43
44
45
46
47
48
49
# File 'lib/inst_data_shipper/dumper.rb', line 43

def for_specs!
  @raw_destinations = ["speccable://nil"]
  @executor = InstDataShipper::Jobs::AsyncCaller.new
  @tracker = DumpBatch.new(job_class: self.class.to_s, genre: export_genre, status: 'in_progress')
  define_singleton_method(:spec_destination) { destinations.first }
  self
end

#incremental_sinceObject



154
155
156
# File 'lib/inst_data_shipper/dumper.rb', line 154

def incremental_since
  batch_context[:incremental_since]
end

#last_successful_trackerObject



117
118
119
# File 'lib/inst_data_shipper/dumper.rb', line 117

def last_successful_tracker
  @last_successful_tracker ||= DumpBatch.where(job_class: self.class.to_s, genre: export_genre, status: 'completed').order(created_at: :desc).first
end

#lookup_table_schema(*identifiers) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/inst_data_shipper/dumper.rb', line 158

def lookup_table_schema(*identifiers)
  identifiers.compact.each do |ident|
    if ident.is_a?(Hash)
      key = ident.keys.first
      value = ident.values.first
    else
      key = :warehouse_name
      value = ident
    end

    value = Array(value).compact

    schema[:tables].each do |ts|
      return ts if value.include?(ts[key])
    end
  end

  nil
end

#lookup_table_schema!(*identifiers) ⇒ Object



178
179
180
# File 'lib/inst_data_shipper/dumper.rb', line 178

def lookup_table_schema!(*identifiers)
  lookup_table_schema(*identifiers) || raise("No table schema found for #{identifiers.inspect}")
end

#origin_classObject



125
126
127
# File 'lib/inst_data_shipper/dumper.rb', line 125

def origin_class
  batch_context[:origin_class]&.constantize || self.class
end

#schemaObject

Raises:

  • (NotImplementedError)


129
130
131
132
# File 'lib/inst_data_shipper/dumper.rb', line 129

def schema
  return origin_class::SCHEMA if defined?(origin_class::SCHEMA)
  raise NotImplementedError
end

#schema_digestObject



134
135
136
# File 'lib/inst_data_shipper/dumper.rb', line 134

def schema_digest
  Digest::MD5.hexdigest(schema.to_json)[0...8]
end

#table_is_incremental?(table_def) ⇒ Boolean

Returns:

  • (Boolean)


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/inst_data_shipper/dumper.rb', line 138

def table_is_incremental?(table_def)
  return false unless incremental_since.present?

  # TODO Return false if table's schema changes
  if (inc = table_def[:incremental]).present?
    differ = inc[:if]
    return !!incremental_since if differ.nil?

    differ = :"#{differ}".to_proc if differ.is_a?(Symbol)
    differ = instance_exec(&differ) if differ.is_a?(Proc)
    return !!differ
  end

  false
end

#trackerObject



113
114
115
# File 'lib/inst_data_shipper/dumper.rb', line 113

def tracker
  @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil
end