Class: Webhookdb::Jobs::DurableSleeper

Inherits:
Object
  • Object
show all
Includes:
Amigo::DurableJob, Sidekiq::Job
Defined in:
lib/webhookdb/jobs/amigo_test_jobs.rb

Overview

Use this to verify the behavior of durable jobs:

  • Ensure DISABLE_DURABLE_JOBS_POLL env var is set.

  • ‘make run` and then `Webhookdb::Async.open_web` to go to Sidekiq web UI.

  • ‘make run-workers`, and copy the PID.

  • From pry: ‘Webhookdb::Jobs::DurableSleeper.setup_test_run(30)`

  • Jobs are put into the queue and the workers will be ‘running’ (sleeping).

  • Wait for some jobs to be done sleeping.

  • ‘pkill -9 <pid>`, kills Sidekiq without cleanup.

  • ‘Webhookdb::Jobs::DurableSleeper.print_status` will print the queue size. It would be ’10’ if you started with 30 jobs, 10 processed, and the worker was killed while 10 more were processing (leaving 10 unprocessed jobs in the queue). It will also print dead jobs, which will be 0. It will also print processed jobs, will be be 10.

  • Restart workers with ‘make run-workers`.

  • The next 10 workers will run (queue). ‘print_status` returns 0 for queue and dead size, and 20 for jobs processed.

  • Run ‘Amigo::DurableJob.poll_jobs`.

  • Go to the web UI’s Dead jobs. Observe 10 jobs are there. ‘print_status` also shows 10 jobs as dead.

  • Retry those jobs.

  • ‘print_status` shows 0 dead and 30 processed jobs.

Constant Summary collapse

MUX =
Mutex.new
COUNTER_FILE =
".durable-sleeper-counter"

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Amigo::DurableJob

enabled?, ensure_jobs_tables, heartbeat, heartbeat!, included, insert_job, lock_job, poll_jobs, reconnect, replace_database_settings, set_database_setting, storage_datasets, unlock_job

Class Method Details

.heartbeat_extensionObject



38
39
40
# File 'lib/webhookdb/jobs/amigo_test_jobs.rb', line 38

def self.heartbeat_extension
  return 20.seconds
end


57
58
59
60
61
62
# File 'lib/webhookdb/jobs/amigo_test_jobs.rb', line 57

def self.print_status
  done = File.read(COUNTER_FILE).to_i
  puts "Queue Size: #{Sidekiq::Queue.new.size}"
  puts "Processed:  #{done}"
  puts "Dead Set:   #{Sidekiq::DeadSet.new.size}"
end

.setup_test_run(count = 30) ⇒ Object



52
53
54
55
# File 'lib/webhookdb/jobs/amigo_test_jobs.rb', line 52

def self.setup_test_run(count=30)
  File.write(COUNTER_FILE, "0")
  count.times { Webhookdb::Jobs::DurableSleeper.perform_async }
end

Instance Method Details

#perform(duration = 5) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/webhookdb/jobs/amigo_test_jobs.rb', line 42

def perform(duration=5)
  self.logger.info("sleeping")
  sleep(duration)
  MUX.synchronize do
    done = File.read(COUNTER_FILE).to_i
    done += 1
    File.write(COUNTER_FILE, done.to_s)
  end
end