Class: Marty::Promise

Inherits:
Base show all
Defined in:
app/models/marty/promise.rb

Constant Summary collapse

DEFAULT_PROMISE_TIMEOUT =

default timeout (seconds) to wait for promise values

Rails.configuration.marty.promise_timeout || 30
DEFAULT_JOB_TIMEOUT =

default timeout (seconds) to wait for jobs to start

Rails.configuration.marty.job_timeout || 10

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

get_final_attrs, get_struct_attrs, make_hash, make_openstruct, mcfly_pt

Methods inherited from ActiveRecord::Base

joins, old_joins

Class Method Details

.cleanup(all = false) ⇒ Object



222
223
224
225
226
227
228
229
# File 'app/models/marty/promise.rb', line 222

def cleanup(all = false)
  where(
    'start_dt < ? AND parent_id IS NULL',
    all ? Time.zone.now : 4.hours.ago
  ).destroy_all
rescue StandardError => e
    Marty::Util.logger.error("promise GC error: #{e}")
end

.exception_to_result(promise:, exception:) ⇒ Object



231
232
233
234
235
236
237
# File 'app/models/marty/promise.rb', line 231

def exception_to_result(promise:, exception:)
  if promise.delorean?
    return Delorean::Engine.grok_runtime_exception(exception)
   end

  { 'error' => exception.message, 'backtrace' => exception.backtrace }
end

.job_by_id(job_id) ⇒ Object



101
102
103
# File 'app/models/marty/promise.rb', line 101

def self.job_by_id(job_id)
  Delayed::Job.uncached { Delayed::Job.find_by(id: job_id) }
end

.load_result(obj, force = false) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'app/models/marty/promise.rb', line 201

def load_result(obj, force = false)
  if force && obj.respond_to?(:__force__)
    obj = obj.__force__
  end

  case obj
  when Array
    obj.map { |x| load_result(x, force) }
  when Hash
    p = obj['__promise__']

    if p && obj.length == 1
      load_result(Marty::PromiseProxy.new(*p), force)
    else
      obj.each_with_object({}) { |(k, v), h| h[k] = load_result(v, force) }
    end
  else
    obj
  end
end

.never_started_message(promise) ⇒ Object



239
240
241
# File 'app/models/marty/promise.rb', line 239

def never_started_message(promise)
  "promise #{promise.id} timed out (never started)"
end

.timeout_message(promise) ⇒ Object



243
244
245
# File 'app/models/marty/promise.rb', line 243

def timeout_message(promise)
  "promise #{promise.id} timed out (didn't end)"
end

Instance Method Details

#delorean?Boolean

Returns:

  • (Boolean)


196
197
198
# File 'app/models/marty/promise.rb', line 196

def delorean?
  promise_type == 'delorean'
end

#latestObject



92
93
94
95
96
97
98
99
# File 'app/models/marty/promise.rb', line 92

def latest
  # FIXME: Not sure if this is idiomatic.  What's the best way to
  # force AR to reload the promise object?  reset+reload doesn't
  # seems to work.

  # get latest uncached version
  Marty::Promise.uncached { Marty::Promise.find(id) }
end

#pg_notifyObject



28
29
30
# File 'app/models/marty/promise.rb', line 28

def pg_notify
  raw_conn.async_exec("NOTIFY promise_#{id}")
end

#raw_connObject



24
25
26
# File 'app/models/marty/promise.rb', line 24

def raw_conn
  self.class.connection.raw_connection
end

#result(force = false) ⇒ Object



8
9
10
11
# File 'app/models/marty/promise.rb', line 8

def result(force = false)
  res = super()
  Marty::Promise.load_result(res, force)
end

#set_result(res) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'app/models/marty/promise.rb', line 43

def set_result(res)
  # log "SETRES #{Process.pid} #{self}"

  reload
  # If exception happened before the promise was started
  # we should still update the record
  if res['error'].present? && !start_dt
    self.start_dt ||= DateTime.now
  # promise must have been started and not yet ended
  elsif !start_dt || end_dt || result != {}
    # log "SETERR #{Process.pid} #{self}"
    Marty::Util.logger.error("unexpected promise state: #{self}")
    return
  end

  raise 'bad result' unless res.is_a?(Hash)

  self.status = res['error'].nil?
  self.result = res

  # update title/format from result hash (somewhat hacky)
  self.title   = res['title'].to_s  if res['title']
  self.cformat = res['format'].to_s if res['format']

  # mark promise as ended
  self.end_dt = DateTime.now
  save!

  # log "NOTIFY #{Process.pid}"
  pg_notify
end

#set_startObject



32
33
34
35
36
37
38
39
40
41
# File 'app/models/marty/promise.rb', line 32

def set_start
  if start_dt || result != {}
    Marty::Util.logger.error("promise already started: #{self}")
    return
  end

  # mark promise as started
  self.start_dt = DateTime.now
  save!
end

#to_sObject



75
76
77
# File 'app/models/marty/promise.rb', line 75

def to_s
  inspect
end

#wait_for_my_notify(timeout) ⇒ Object

def log(msg)

open('/tmp/dj.out', 'a') { |f| f.puts msg }

end



83
84
85
86
87
88
89
90
# File 'app/models/marty/promise.rb', line 83

def wait_for_my_notify(timeout)
  while true
    # FIXME: we keep using the same timeout.  The timeout should be
    # reduced by total time spent here.
    n = raw_conn.wait_for_notify(timeout)
    return n if !n || n == "promise_#{id}"
  end
end

#wait_for_result(timeout) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'app/models/marty/promise.rb', line 113

def wait_for_result(timeout)
  # FIXME: Not sure that comparing result with empty hash if a good idea
  # perhaps it's better to use .present? or .blank?
  return result if result != {}

  begin
    # start listening on promise's notification
    raw_conn.exec("LISTEN promise_#{id}")

    last = latest

    # if job hasn't started yet, wait for it to start
    if !last.start_dt
      job = Marty::Promise.job_by_id(last.job_id)

      # FIXME: this block is needed since a lot of specs rely on
      # delayed job being runned in the same thread as promise
      # Can be deleted later and replaces with simple timeout below
      if !job || job.locked_at
        # job has been locked, so it looks like it started already
        # and we need to wait for it.
        wait_for_my_notify(Marty::Promise::DEFAULT_JOB_TIMEOUT)
      else
        # work off the job instead of waiting for a real worker to
        # pick it up.
        # log "OFF0 #{Process.pid} #{last}"
        begin
          work_off_job(job)
        rescue StandardError => e
          # log "OFFERR #{exc}"
          error = self.class.exception_to_result(
            promise: self,
            exception: e
          )
          last.set_result(error)
        end
        # log "OFF1 #{Process.pid} #{last}"
      end

      # FIXME enable after problem with specs is solved
      # wait_for_my_notify(Marty::Promise::DEFAULT_JOB_TIMEOUT)

      last = latest

      # we waited for it but it never started.  So, mark it with a
      # timeout error.
      if !last.start_dt
        # log "TO11 #{Process.pid} #{last}"
        return { 'error' => self.class.never_started_message(last) }
      end
    end

    # reload promise in case out copy doesn't have a result yet
    last = latest unless last.end_dt

    # at this point, we know the promise has already started
    if !last.end_dt
      wait_for_my_notify(timeout)
      last = latest

      if !last.end_dt
        return { 'error' => self.class.timeout_message(last) }
      end
    end

    last.result
  ensure
    # Stop listening to the promise notifications
    raw_conn.exec("UNLISTEN promise_#{id}")
  end
end

#work_off_job(job) ⇒ Object



105
106
107
108
109
110
111
# File 'app/models/marty/promise.rb', line 105

def work_off_job(job)
  # Create a temporary worker to work off the job
  Delayed::Job.where(id: job.id).
    update_all(locked_at: Delayed::Job.db_time_now, locked_by: 'Temp')
  w = Delayed::Worker.new
  w.run(job)
end