Class: Marty::Promise

Inherits:
Base show all
Defined in:
app/models/marty/promise.rb

Defined Under Namespace

Classes: MarshalResult, VirtualRoot

Constant Summary collapse

DEFAULT_PROMISE_TIMEOUT =

default timeout (seconds) to wait for promise values

Rails.configuration.marty.promise_timeout || 30
DEFAULT_JOB_TIMEOUT =

default timeout (seconds) to wait for jobs to start

Rails.configuration.marty.job_timeout || 10

Constants inherited from Base

Base::COUNT_SIG, Base::DISTINCT_SIG, Base::FIRST_SIG, Base::GROUP_SIG, Base::JOINS_SIG, Base::LAST_SIG, Base::LIMIT_SIG, Base::MCFLY_PT_SIG, Base::NOT_SIG, Base::ORDER_SIG, Base::PLUCK_SIG, Base::SELECT_SIG, Base::WHERE_SIG

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

get_final_attrs, get_struct_attrs, make_hash, make_openstruct, mcfly_pt

Methods inherited from ActiveRecord::Base

joins, old_joins

Class Method Details

.children_for_id(id, search_order) ⇒ Object



70
71
72
73
# File 'app/models/marty/promise.rb', line 70

def self.children_for_id(id, search_order)
  q = id == 'root' ? where(parent_id: nil) : find(id).children
  q.live_search(search_order).order(id: :desc).includes(:children, :user)
end

.cleanup(all = false) ⇒ Object



33
34
35
36
37
38
39
40
# File 'app/models/marty/promise.rb', line 33

def self.cleanup(all=false)
  begin
    where('start_dt < ? AND parent_id IS NULL',
          DateTime.now - (all ? 0.hours : 4.hours)).destroy_all
  rescue => exc
    Marty::Util.logger.error("promise GC error: #{exc}")
  end
end

.job_by_id(job_id) ⇒ Object



151
152
153
# File 'app/models/marty/promise.rb', line 151

def self.job_by_id(job_id)
  Delayed::Job.uncached {Delayed::Job.find_by_id(job_id)}
end

.rootObject



66
67
68
# File 'app/models/marty/promise.rb', line 66

def self.root
  VirtualRoot.new
end

Instance Method Details

#latestObject



142
143
144
145
146
147
148
149
# File 'app/models/marty/promise.rb', line 142

def latest
  # FIXME: Not sure if this is idiomatic.  What's the best way to
  # force AR to reload the promise object?  reset+reload doesn't
  # seems to work.

  # get latest uncached version
  Marty::Promise.uncached {Marty::Promise.find(id)}
end

#leafObject



75
76
77
# File 'app/models/marty/promise.rb', line 75

def leaf
  children.empty?
end

#pg_notifyObject



83
84
85
# File 'app/models/marty/promise.rb', line 83

def pg_notify
  raw_conn.async_exec("NOTIFY promise_#{id}")
end

#raw_connObject



79
80
81
# File 'app/models/marty/promise.rb', line 79

def raw_conn
  self.class.connection.raw_connection
end

#set_result(res) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'app/models/marty/promise.rb', line 98

def set_result(res)
  # log "SETRES #{Process.pid} #{self}"

  # promise must have been started and not yet ended
  if !self.start_dt || self.end_dt || self.result != {}
    # log "SETERR #{Process.pid} #{self}"
    Marty::Util.logger.error("unexpected promise state: #{self}")
    return
  end

  raise "bad result" unless res.is_a?(Hash)

  self.status = res["error"].nil?
  self.result = res

  # update title/format from result hash (somewhat hacky)
  self.title   = res["title"].to_s  if res["title"]
  self.cformat = res["format"].to_s if res["format"]

  # mark promise as ended
  self.end_dt = DateTime.now
  self.save!

  # log "NOTIFY #{Process.pid}"
  pg_notify
end

#set_startObject



87
88
89
90
91
92
93
94
95
96
# File 'app/models/marty/promise.rb', line 87

def set_start
  if self.start_dt || self.result != {}
    Marty::Util.logger.error("promise already started: #{self}")
    return
  end

  # mark promise as started
  self.start_dt = DateTime.now
  self.save!
end

#to_sObject



125
126
127
# File 'app/models/marty/promise.rb', line 125

def to_s
  inspect
end

#wait_for_my_notify(timeout) ⇒ Object

def log(msg)

open('/tmp/dj.out', 'a') { |f| f.puts msg }

end



133
134
135
136
137
138
139
140
# File 'app/models/marty/promise.rb', line 133

def wait_for_my_notify(timeout)
  while true do
    # FIXME: we keep using the same timeout.  The timeout should be
    # reduced by total time spent here.
    n = raw_conn.wait_for_notify(timeout)
    return n if !n || n=="promise_#{id}"
  end
end

#wait_for_result(timeout) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'app/models/marty/promise.rb', line 163

def wait_for_result(timeout)
  return self.result if self.result != {}

  begin
    # start listening on promise's notification
    raw_conn.exec("LISTEN promise_#{id}")

    last = latest

    # if job hasn't started yet, wait for it to start
    if !last.start_dt
      job = Marty::Promise.job_by_id(last.job_id)

      # log "AAAA #{Process.pid} #{last} #{job}"

      if !job || job.locked_at
        # job has been locked, so it looks like it started already
        # and we need to wait for it.
        wait_for_my_notify(Marty::Promise::DEFAULT_JOB_TIMEOUT)
      else
        # work off the job instead of waiting for a real worker to
        # pick it up.
        # log "OFF0 #{Process.pid} #{last}"
        begin
          work_off_job(job)
        rescue => exc
          # log "OFFERR #{exc}"
          res = Delorean::Engine.grok_runtime_exception(exc)
          last.set_result(res)
        end
        # log "OFF1 #{Process.pid} #{last}"
      end

      last = latest

      # we waited for it but it never started.  So, mark it with a
      # timeout error.
      if !last.start_dt
        # log "TO11 #{Process.pid} #{last}"
        return {"error" => "promise #{last.id} timed out (never started)"}
      end
    end

    # reload promise in case out copy doesn't have a result yet
    last = latest unless last.end_dt

    # at this point, we know the promise has already started
    if !last.end_dt
      wait_for_my_notify(timeout)
      # log "UUUU #{Process.pid} #{id} #{Time.now.to_f}"
      last = latest

      # log "XXXX #{Process.pid} #{Time.now.to_f} #{last}"

      if !last.end_dt
        # log "TO22 #{Process.pid} #{last}"
        return {"error" => "promise #{last.id} timed out (didn't end)"}
      end
    end

    # log "RRRR #{Process.pid} #{last} #{Time.now.to_f}"

    last.result
  ensure
    # Stop listening to the promise notifications
    raw_conn.exec("UNLISTEN promise_#{id}")
  end
end

#work_off_job(job) ⇒ Object



155
156
157
158
159
160
161
# File 'app/models/marty/promise.rb', line 155

def work_off_job(job)
  # Create a temporary worker to work off the job
  Delayed::Job.where(id: job.id).
    update_all(locked_at: Delayed::Job.db_time_now, locked_by: "Temp")
  w = Delayed::Worker.new
  w.run(job)
end