Class: Bricolage::StreamingLoad::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/bricolage/streamingload/job.rb

Defined Under Namespace

Classes: ControlConnection, DataConnection, LoadLog

Constant Summary collapse

MAX_RETRY =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context:, ctl_ds:, data_ds:, log_table: 'strload_load_logs', task_id:, force: false, logger:) ⇒ Job

Returns a new instance of Job.



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/bricolage/streamingload/job.rb', line 22

def initialize(context:, ctl_ds:, data_ds:, log_table: 'strload_load_logs', task_id:, force: false, logger:)
  @context = context
  @ctl_ds = ctl_ds
  @data_ds = data_ds
  @log_table = log_table
  @task_id = task_id
  @force = force
  @logger = logger

  @task = nil
  @job_id = nil
  @manifest = nil
end

Instance Attribute Details

#data_dsObject (readonly)

Returns the value of attribute data_ds.



40
41
42
# File 'lib/bricolage/streamingload/job.rb', line 40

def data_ds
  @data_ds
end

#job_idObject (readonly)

For tests



37
38
39
# File 'lib/bricolage/streamingload/job.rb', line 37

def job_id
  @job_id
end

#manifestObject (readonly)

Returns the value of attribute manifest.



41
42
43
# File 'lib/bricolage/streamingload/job.rb', line 41

def manifest
  @manifest
end

#process_idObject (readonly)

Returns the value of attribute process_id.



38
39
40
# File 'lib/bricolage/streamingload/job.rb', line 38

def process_id
  @process_id
end

#taskObject (readonly)

Returns the value of attribute task.



39
40
41
# File 'lib/bricolage/streamingload/job.rb', line 39

def task
  @task
end

Instance Method Details

#do_load(task, job_id) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/bricolage/streamingload/job.rb', line 152

def do_load(task, job_id)
  params = JobParams.load(@context, task.task_class, task.schema_name, task.table_name)
  @data_ds = params.ds
  @manifest = ManifestFile.create(ds: params.ctl_bucket, job_id: job_id, object_urls: task.object_urls, logger: @logger)
  log = LoadLog.new
  log.task_id = @task_id
  log.job_id = job_id
  DataConnection.open(params.ds, log_table: @log_table, logger: @logger) {|data|
    if params.enable_work_table?
      data.load_with_work_table params.work_table, @manifest, params.load_options_string, params.sql_source, log
    else
      data.load_objects params.dest_table, @manifest, params.load_options_string, log
    end
  }
end

#execute(fail_fast: false) ⇒ Object

Returns true -> Deletes a SQS message Returns false -> Keeps a SQS message



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/bricolage/streamingload/job.rb', line 45

def execute(fail_fast: false)
  execute_task
  return true
rescue JobCancelled => ex
  Raven.capture_exception(ex)
  return true
rescue JobDuplicated
  return true
rescue JobDefered
  return false
rescue ControlConnectionFailed => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  wait_for_connection('ctl', @ctl_ds) unless fail_fast
  return false
rescue DataConnectionFailed => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  wait_for_connection('data', @data_ds) unless fail_fast
  return false
rescue JobFailure => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  return false
rescue JobError => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  return false
rescue Exception => ex
  @logger.exception ex
  Raven.capture_exception(ex)
  return true
end

#execute_taskObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/bricolage/streamingload/job.rb', line 81

def execute_task
  @process_id = "#{Socket.gethostname}-#{$$}"
  @logger.info "execute task: task_id=#{@task_id} force=#{@force} process_id=#{@process_id}"
  ctl = ControlConnection.new(@ctl_ds, @logger)

  ctl.open {
    @task = ctl.load_task(@task_id)
    @logger.info "task details: task_id=#{@task_id} table=#{@task.schema_name}.#{@task.table_name}"
    if @task.disabled
      # We do not record disabled job in the DB.
      @logger.info "task is disabled; defer task: task_id=#{@task_id}"
      raise JobDefered, "defered: task_id=#{@task_id}"
    end

    if @task.unknown_state?
      true_status = DataConnection.open(@data_ds, log_table: @log_table, logger: @logger) {|data|
        data.get_job_status(@task.last_job_id)
      }
      @logger.info "fixiating unknown job status: job_id=#{@task.last_job_id}, status=(unknown->#{true_status})"
      @task.fix_last_job_status true_status
      ctl.fix_job_status @task.last_job_id, true_status
      @logger.info "job status fixed."
    end

    @job_id = ctl.begin_job(@task_id, @process_id, @force)
    unless @job_id
      @logger.warn "task is already succeeded and not forced; discard task: task_id=#{@task_id}"
      ctl.commit_duplicated_job @task_id, @process_id
      raise JobDuplicated, "duplicated: task_id=#{@task_id}"
    end
  }

  begin
    do_load @task, @job_id
    ctl.open {
      ctl.commit_job @job_id, (@force ? 'forced' : nil)
    }
  rescue ControlConnectionFailed
    raise
  rescue DataConnectionFailed => ex
    ctl.open {
      ctl.abort_job job_id, 'unknown', ex.message.lines.first.strip
    }
    raise
  rescue JobFailure => ex
    ctl.open {
      fail_count = @task.failure_count
      final_retry = (fail_count >= MAX_RETRY)
      retry_msg = (fail_count > 0) ? "(retry\##{fail_count}#{final_retry ? ' FINAL' : ''}) " : ''
      ctl.abort_job job_id, 'failure', retry_msg + ex.message.lines.first.strip
      raise JobCancelled, "retry count exceeds limit: task_id=#{@task_id}" if final_retry
    }
    raise
  rescue JobError => ex
    ctl.open {
      fail_count = @task.failure_count
      final_retry = (fail_count >= MAX_RETRY)
      retry_msg = (fail_count > 0) ? "(retry\##{fail_count}#{final_retry ? ' FINAL' : ''}) " : ''
      ctl.abort_job job_id, 'error', retry_msg + ex.message.lines.first.strip
      raise JobCancelled, "retry count exceeds limit: task_id=#{@task_id}" if final_retry
    }
    raise
  rescue Exception => ex
    @logger.exception ex
    ctl.open {
      ctl.abort_job job_id, 'error', ex.message.lines.first.strip
    }
    raise JobError, "#{ex.class}: #{ex.message}"
  end
end

#wait_for_connection(type, ds) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/bricolage/streamingload/job.rb', line 168

def wait_for_connection(type, ds)
  @logger.warn "loader: #{type} DB connection lost; polling..."
  start_time = Time.now
  n = 0
  while true
    begin
      ds.open {}
      @logger.warn "loader: #{type} DB connection recovered; return to normal state"
      return true
    rescue ConnectionError
    end
    sleep 15
    n += 1
    if n == 120  # 30 min
      # Could not get a connection in 30 minutes, now we return to the queue loop.
      # Next job may fail too, but we should not stop to receive the task queue too long,
      # because it contains control tasks.
      @logger.warn "loader: #{type} DB connection still failing (since #{start_time}); give up."
      return false
    end
  end
end