Class: SolidFlow::Jobs::TimerSweepJob

Inherits:
ActiveJob::Base
  • Object
show all
Defined in:
app/jobs/solidflow/jobs/timer_sweep_job.rb

Instance Method Summary collapse

Instance Method Details

#perform(batch_size: 100) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'app/jobs/solidflow/jobs/timer_sweep_job.rb', line 10

def perform(batch_size: 100)
  now = SolidFlow.configuration.time_provider.call

  SolidFlow::Timer.transaction do
    SolidFlow::Timer
      .scheduled
      .where("run_at <= ?", now)
      .limit(batch_size)
      .lock("FOR UPDATE SKIP LOCKED")
      .each do |timer|
        SolidFlow.store.mark_timer_fired(timer_id: timer.id)
        SolidFlow.instrument(
          "solidflow.timer.fired",
          execution_id: timer.execution_id,
          timer_id: timer.id,
          step: timer.step
        )
      end
  end
end