Class: Marty::Event

Inherits:
Base show all
Defined in:
app/models/marty/event.rb

Defined Under Namespace

Classes: EventValidator

Constant Summary collapse

UPDATE_SQL =
<<SQL
  UPDATE marty_events as me
  SET start_dt = p.start_dt,
      end_dt = p.end_dt
  FROM marty_promises p
  WHERE me.promise_id = p.id
    AND (   (    p.start_dt IS NOT NULL
             AND me.start_dt IS NULL
            )
         OR (    p.end_dt IS NOT NULL
             AND me.end_dt IS NULL
            )
        )
SQL
BASE_QUERY =
<<SQL
SELECT id,
            klass,
            subject_id,
            enum_event_operation,
            comment,
            start_dt,
            end_dt,
            expire_secs,
            error
     FROM marty_events
SQL

Constants inherited from ActiveRecord::Base

ActiveRecord::Base::COUNT_SIG, ActiveRecord::Base::DISTINCT_SIG, ActiveRecord::Base::FIND_BY_SIG, ActiveRecord::Base::FIRST_SIG, ActiveRecord::Base::GROUP_SIG, ActiveRecord::Base::JOINS_SIG, ActiveRecord::Base::LAST_SIG, ActiveRecord::Base::LIMIT_SIG, ActiveRecord::Base::MCFLY_PT_SIG, ActiveRecord::Base::NOT_SIG, ActiveRecord::Base::ORDER_SIG, ActiveRecord::Base::PLUCK_SIG, ActiveRecord::Base::SELECT_SIG, ActiveRecord::Base::WHERE_SIG

Class Method Summary collapse

Methods inherited from Base

get_final_attrs, get_struct_attrs, make_hash, make_openstruct, mcfly_pt

Methods inherited from ActiveRecord::Base

joins, old_joins

Class Method Details

.all_finishedObject



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'app/models/marty/event.rb', line 259

def self.all_finished
  @all_finished ||= {
    data:      {},
    timestamp: Time.zone.parse('00:00:00').to_i,
  }
  @poll_secs ||= Marty::Config['MARTY_EVENT_POLL_SECS'] || 0
  time_now_i = Time.zone.now.to_i
  cutoff = Time.zone.at(@all_finished[:timestamp]).
           strftime('%Y-%m-%d %H:%M:%S.%6N')

  upd_count = update_start_and_end
  if upd_count > 0 ||
     time_now_i - @all_finished[:timestamp] > @poll_secs
    raw = get_data("
      SELECT * FROM (SELECT  id
                           , klass
                           , subject_id
                           , enum_event_operation
                           , start_dt
                           , end_dt
                           , expire_secs
                           , comment
                           , error
                           , ROW_NUMBER() OVER (PARTITION BY
                                                          klass
                                                        , subject_id
                                                        , enum_event_operation
                                                ORDER BY  end_dt DESC) row_num
                     FROM marty_events
                     WHERE end_dt > '#{cutoff}') sub
      WHERE row_num = 1")
    @all_finished[:timestamp] = time_now_i
    raw.each_with_object(@all_finished[:data]) do |ev, hash|
      if ev["end_dt"] && ev["error"].nil?
        real_ev = Marty::Event.where(id: ev["id"]).first
        promise = Marty::Promise.where(id: real_ev["promise_id"]).first
        maybe_error = promise.result["error"]
        ev["error"] = real_ev.error = !!maybe_error
        real_ev.comment = maybe_error
        real_ev.save!
      end
      subhash = hash[[ev["klass"], ev["subject_id"]]] ||= {}
      subhash[ev["enum_event_operation"]] =
        ev["end_dt"].strftime("%Y-%m-%d %H:%M:%S")
    end
  end
  @all_finished[:data]
end

.cleanupObject



312
313
314
315
316
317
318
# File 'app/models/marty/event.rb', line 312

def self.cleanup
  begin
    where('start_dt < ?', Time.zone.now - 48.hours).delete_all
  rescue => exc
    Marty::Util.logger.error("event GC error: #{exc}")
  end
end

.clear_cacheObject



239
240
241
# File 'app/models/marty/event.rb', line 239

def self.clear_cache
  @poll_secs = @all_running = @all_finished = nil
end

.compact_end_dt(hash) ⇒ Object



217
218
219
# File 'app/models/marty/event.rb', line 217

def self.compact_end_dt(hash)
  hash['end_dt'] ? hash['end_dt'].strftime("%H:%M") : '---'
end

.create_event(klass, subject_id, operation, start_dt, expire_secs, comment = nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'app/models/marty/event.rb', line 62

def self.create_event(klass,
                      subject_id,
                      operation,
                      start_dt,
                      expire_secs,
                      comment=nil)

  # use lookup_event instead of all_running which is throttled
  evs = self.lookup_event(klass, subject_id, operation)
  running = evs.detect do
    |ev|
    next if ev["end_dt"]
    next true unless ev["expire_secs"]
    (Time.zone.now - ev["start_dt"]).truncate < ev["expire_secs"]
  end

  raise "#{operation} is already running for #{klass}/#{subject_id}" if
    running

  self.create!(klass:                klass,
               subject_id:           subject_id,
               enum_event_operation: operation,
               start_dt:             start_dt,
               expire_secs:          expire_secs,
               comment:              comment,
              )
end

.currently_running(klass, subject_id) ⇒ Object



188
189
190
191
192
# File 'app/models/marty/event.rb', line 188

def self.currently_running(klass, subject_id)
  all_running.select do |pm|
    pm["klass"] == klass && pm["subject_id"] == subject_id.to_i
  end.map { |e| e["enum_event_operation"] }
end

.currently_running_multi(klass, subject_id_raw) ⇒ Object



194
195
196
197
198
199
200
201
# File 'app/models/marty/event.rb', line 194

def self.currently_running_multi(klass, subject_id_raw)
  subject_ids = [subject_id_raw].flatten.map(&:to_i)
  all_running.select do |pm|
    pm["klass"] == klass && subject_ids.include?(pm["subject_id"])
  end.each_with_object({}) do |e, h|
    (h[e["subject_id"]] ||= []) <<  e["enum_event_operation"]
  end
end

.finish_event(klass, subject_id, operation, error = false, comment = nil) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'app/models/marty/event.rb', line 101

def self.finish_event(klass, subject_id, operation, error=false, comment=nil)
  raise "error must be true or false" unless [true, false].include?(error)
  time_now_s = Time.zone.now.strftime('%Y-%m-%d %H:%M:%S.%6N')

  event = get_data(running_query(time_now_s)).detect do |ev|
    ev["klass"] == klass && ev["subject_id"] == subject_id.to_i &&
      ev["enum_event_operation"] == operation
  end
  raise "event #{klass}/#{subject_id}/#{operation} not found" unless
    event

  ev = Marty::Event.find_by_id(event["id"])
  raise "can't explicitly finish a promise event" if ev.promise_id
  ev.end_dt = Time.zone.now
  ev.error = error
  ev.comment = comment if comment
  ev.save!
end

.get_finished(klass, id) ⇒ Object



308
309
310
# File 'app/models/marty/event.rb', line 308

def self.get_finished(klass, id)
  all_finished[[klass, id]]
end

.last_event(klass, subject_id, operation = nil) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'app/models/marty/event.rb', line 120

def self.last_event(klass, subject_id, operation=nil)
  hash = all_running.select do |pm|
    pm["klass"] == klass && pm["subject_id"] == subject_id.to_i &&
      (operation.nil? || pm["enum_event_operation"] == operation)
  end.last

  return hash if hash

  op_sql = "AND enum_event_operation = '#{operation}'" if operation

  get_data("#{BASE_QUERY}
            WHERE klass = '#{klass}'
            AND subject_id = #{subject_id} #{op_sql}
            AND end_dt IS NOT NULL
            ORDER BY end_dt desc").first
end

.last_event_multi(klass, subject_ids_arg, operation = nil) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'app/models/marty/event.rb', line 137

def self.last_event_multi(klass, subject_ids_arg, operation=nil)
  subject_ids = subject_ids_arg.map(&:to_i)
  events = all_running.select do |pm|
    pm["klass"] == klass && subject_ids.include?(pm["subject_id"]) &&
      (operation.nil? || pm["enum_event_operation"] == operation)
  end.group_by { |ev| ev["subject_id"] }.each_with_object({}) do
    |(id, evs), h|
    h[id] = evs.sort { |a, b| a["start_dt"] <=> b["start_dt"] }.first
  end

  running_ids = events.keys
  check_fin = subject_ids - running_ids

  if check_fin.present?
    op_filt = "AND enum_event_operation = '#{operation}'" if operation
    op_col = ", enum_event_operation" if operation

    fins = get_data("SELECT klass,
                            subject_id,
                            enum_event_operation,
                            comment,
                            start_dt,
                            end_dt,
                            expire_secs,
                            error
                     FROM (SELECT klass,
                                  subject_id,
                                  enum_event_operation,
                                  comment,
                                  start_dt,
                                  end_dt,
                                  expire_secs,
                                  error,
                                  ROW_NUMBER() OVER (PARTITION BY klass,
                                                                  subject_id
                                                                  #{op_col}
                                                     ORDER BY end_dt DESC) rnum
                        FROM marty_events
                        WHERE klass = '#{klass}'
                          AND subject_id IN (#{check_fin.join(',')})
                          #{op_filt}
                          AND end_dt IS NOT NULL) sub
                     WHERE rnum = 1")

    fins.each do |fin|
      events[fin["subject_id"]] = fin
    end
  end
  events
end

.lookup_event(klass, subject_id, operation) ⇒ Object



90
91
92
93
94
95
96
97
98
99
# File 'app/models/marty/event.rb', line 90

def self.lookup_event(klass, subject_id, operation)
  update_start_and_end
  get_data("#{BASE_QUERY}
            WHERE klass = '#{klass}'
            AND subject_id = #{subject_id}
            AND enum_event_operation = '#{operation}'")

  #For now we return a bare hash
  #Marty::Event.find_by_id(hash["id"])
end

.op_is_running?(klass, subject_id, operation) ⇒ Boolean

Returns:

  • (Boolean)


55
56
57
58
59
60
# File 'app/models/marty/event.rb', line 55

def self.op_is_running?(klass, subject_id, operation)
  all_running.detect do |pm|
    pm["klass"] == klass && pm["subject_id"].to_i == subject_id.to_i &&
      pm["enum_event_operation"] == operation
  end
end

.pretty_op(hash) ⇒ Object



210
211
212
213
214
215
# File 'app/models/marty/event.rb', line 210

def self.pretty_op(hash)
  d = hash['enum_event_operation'].downcase.capitalize

  #&& !(hash['comment'] =~ /^ERROR/)
  hash['end_dt'] ? d.sub(/ing/, 'ed') : d
end

.running_query(time_now_s) ⇒ Object



46
47
48
49
50
51
52
53
# File 'app/models/marty/event.rb', line 46

def self.running_query(time_now_s)
  "#{BASE_QUERY}
   WHERE start_dt >= '#{time_now_s}'::timestamp - interval '24 hours'
     AND (end_dt IS NULL or end_dt > '#{time_now_s}'::timestamp)
     AND (expire_secs IS NULL
      OR expire_secs > EXTRACT (EPOCH FROM '#{time_now_s}'::timestamp - start_dt))
    ORDER BY start_dt"
end

.update_comment(hash, comment) ⇒ Object



203
204
205
206
207
208
# File 'app/models/marty/event.rb', line 203

def self.update_comment(hash, comment)
  hid = hash.is_a?(Hash) ? hash['id'] : hash
  e = Marty::Event.find_by_id(hid)
  e.comment = comment
  e.save!
end

.update_start_and_endObject



221
222
223
# File 'app/models/marty/event.rb', line 221

def self.update_start_and_end
  ActiveRecord::Base.connection.execute(UPDATE_SQL).cmd_tuples
end