Class: Marty::Promise

Inherits:
Base show all
Defined in:
app/models/marty/promise.rb

Constant Summary collapse

DEFAULT_PROMISE_TIMEOUT =

default timeout (seconds) to wait for promise values

Rails.configuration.marty.promise_timeout || 30
DEFAULT_JOB_TIMEOUT =

default timeout (seconds) to wait for jobs to start

Rails.configuration.marty.job_timeout || 10

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

get_final_attrs, get_struct_attrs, make_hash, make_openstruct, mcfly_pt

Methods inherited from ActiveRecord::Base

joins, old_joins

Class Method Details

.cleanup(all = false) ⇒ Object



213
214
215
216
217
218
219
220
# File 'app/models/marty/promise.rb', line 213

def cleanup(all = false)
  where(
    'start_dt < ? AND parent_id IS NULL',
    DateTime.now - (all ? 0.hours : 4.hours)
  ).destroy_all
rescue StandardError => e
    Marty::Util.logger.error("promise GC error: #{e}")
end

.exception_to_result(promise:, exception:) ⇒ Object



222
223
224
225
226
227
228
# File 'app/models/marty/promise.rb', line 222

def exception_to_result(promise:, exception:)
  if promise.delorean?
    return Delorean::Engine.grok_runtime_exception(exception)
   end

  { 'error' => exception.message, 'backtrace' => exception.backtrace }
end

.job_by_id(job_id) ⇒ Object



95
96
97
# File 'app/models/marty/promise.rb', line 95

def self.job_by_id(job_id)
  Delayed::Job.uncached { Delayed::Job.find_by_id(job_id) }
end

.load_result(obj, force = false) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'app/models/marty/promise.rb', line 192

def load_result(obj, force = false)
  if force && obj.respond_to?(:__force__)
    obj = obj.__force__
  end

  case obj
  when Array
    obj.map { |x| load_result(x, force) }
  when Hash
    p = obj['__promise__']

    if p && obj.length == 1
      load_result(Marty::PromiseProxy.new(*p), force)
    else
      obj.each_with_object({}) { |(k, v), h| h[k] = load_result(v, force) }
    end
  else
    obj
  end
end

Instance Method Details

#delorean?Boolean

Returns:

  • (Boolean)


187
188
189
# File 'app/models/marty/promise.rb', line 187

def delorean?
  promise_type == 'delorean'
end

#latestObject



86
87
88
89
90
91
92
93
# File 'app/models/marty/promise.rb', line 86

def latest
  # FIXME: Not sure if this is idiomatic.  What's the best way to
  # force AR to reload the promise object?  reset+reload doesn't
  # seems to work.

  # get latest uncached version
  Marty::Promise.uncached { Marty::Promise.find(id) }
end

#pg_notifyObject



27
28
29
# File 'app/models/marty/promise.rb', line 27

def pg_notify
  raw_conn.async_exec("NOTIFY promise_#{id}")
end

#raw_connObject



23
24
25
# File 'app/models/marty/promise.rb', line 23

def raw_conn
  self.class.connection.raw_connection
end

#result(force = false) ⇒ Object



8
9
10
11
# File 'app/models/marty/promise.rb', line 8

def result(force = false)
  res = super()
  Marty::Promise.load_result(res, force)
end

#set_result(res) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'app/models/marty/promise.rb', line 42

def set_result(res)
  # log "SETRES #{Process.pid} #{self}"

  # promise must have been started and not yet ended
  if !start_dt || end_dt || result != {}
    # log "SETERR #{Process.pid} #{self}"
    Marty::Util.logger.error("unexpected promise state: #{self}")
    return
  end

  raise 'bad result' unless res.is_a?(Hash)

  self.status = res['error'].nil?
  self.result = res

  # update title/format from result hash (somewhat hacky)
  self.title   = res['title'].to_s  if res['title']
  self.cformat = res['format'].to_s if res['format']

  # mark promise as ended
  self.end_dt = DateTime.now
  save!

  # log "NOTIFY #{Process.pid}"
  pg_notify
end

#set_startObject



31
32
33
34
35
36
37
38
39
40
# File 'app/models/marty/promise.rb', line 31

def set_start
  if start_dt || result != {}
    Marty::Util.logger.error("promise already started: #{self}")
    return
  end

  # mark promise as started
  self.start_dt = DateTime.now
  save!
end

#to_sObject



69
70
71
# File 'app/models/marty/promise.rb', line 69

def to_s
  inspect
end

#wait_for_my_notify(timeout) ⇒ Object

def log(msg)

open('/tmp/dj.out', 'a') { |f| f.puts msg }

end



77
78
79
80
81
82
83
84
# File 'app/models/marty/promise.rb', line 77

def wait_for_my_notify(timeout)
  while true
    # FIXME: we keep using the same timeout.  The timeout should be
    # reduced by total time spent here.
    n = raw_conn.wait_for_notify(timeout)
    return n if !n || n == "promise_#{id}"
  end
end

#wait_for_result(timeout) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'app/models/marty/promise.rb', line 107

def wait_for_result(timeout)
  # FIXME: Not sure that comparing result with empty hash if a good idea
  # perhaps it's better to use .present? or .blank?
  return result if result != {}

  begin
    # start listening on promise's notification
    raw_conn.exec("LISTEN promise_#{id}")

    last = latest

    # if job hasn't started yet, wait for it to start
    if !last.start_dt
      job = Marty::Promise.job_by_id(last.job_id)

      # FIXME: this block is needed since a lot of specs rely on
      # delayed job being runned in the same thread as promise
      # Can be deleted later and replaces with simple timeout below
      if !job || job.locked_at
        # job has been locked, so it looks like it started already
        # and we need to wait for it.
        wait_for_my_notify(Marty::Promise::DEFAULT_JOB_TIMEOUT)
      else
        # work off the job instead of waiting for a real worker to
        # pick it up.
        # log "OFF0 #{Process.pid} #{last}"
        begin
          work_off_job(job)
        rescue StandardError => e
          # log "OFFERR #{exc}"
          error = exception_to_result(e)
          last.set_result(error)
        end
        # log "OFF1 #{Process.pid} #{last}"
      end

      # FIXME enable after problem with specs is solved
      # wait_for_my_notify(Marty::Promise::DEFAULT_JOB_TIMEOUT)

      last = latest

      # we waited for it but it never started.  So, mark it with a
      # timeout error.
      if !last.start_dt
        # log "TO11 #{Process.pid} #{last}"
        return { 'error' => "promise #{last.id} timed out (never started)" }
      end
    end

    # reload promise in case out copy doesn't have a result yet
    last = latest unless last.end_dt

    # at this point, we know the promise has already started
    if !last.end_dt
      wait_for_my_notify(timeout)
      last = latest

      if !last.end_dt
        return { 'error' => "promise #{last.id} timed out (didn't end)" }
      end
    end

    last.result
  ensure
    # Stop listening to the promise notifications
    raw_conn.exec("UNLISTEN promise_#{id}")
  end
end

#work_off_job(job) ⇒ Object



99
100
101
102
103
104
105
# File 'app/models/marty/promise.rb', line 99

def work_off_job(job)
  # Create a temporary worker to work off the job
  Delayed::Job.where(id: job.id).
    update_all(locked_at: Delayed::Job.db_time_now, locked_by: 'Temp')
  w = Delayed::Worker.new
  w.run(job)
end