Module: Qless::WorkerHelpers

Defined in:
lib/qless/test_helpers/worker_helpers.rb

Instance Method Summary collapse

Instance Method Details

#drain_worker_queues(worker) ⇒ Object

Runs the worker until it has no more jobs to process, effectively drainig its queues.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/qless/test_helpers/worker_helpers.rb', line 38

def drain_worker_queues(worker)
  worker.extend Module.new {
    # For the child: stop as soon as it can't pop more jobs.
    def no_job_available
      shutdown
    end

    # For the parent: when the child stops,
    # don't try to restart it; shutdown instead.
    def spawn_replacement_child(*)
      shutdown
    end
  }

  worker.run
end

#run_jobs(worker, count) ⇒ Object

Run only the given number of jobs, then stop



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/qless/test_helpers/worker_helpers.rb', line 19

def run_jobs(worker, count)
  worker.extend Module.new {
    define_method(:jobs) do
      base_enum = super()
      Enumerator.new do |enum|
        count.times { enum << base_enum.next }
      end
    end
  }

  thread = Thread.start { yield } if block_given?
  thread.abort_on_exception if thread
  worker.run
ensure
  thread.join(0.1) if thread
end

#run_worker_concurrently_with(worker, &block) ⇒ Object

Yield with a worker running, and then clean the worker up afterwards



4
5
6
7
8
9
10
# File 'lib/qless/test_helpers/worker_helpers.rb', line 4

def run_worker_concurrently_with(worker, &block)
  thread = Thread.start { stop_worker_after(worker, &block) }
  thread.abort_on_exception = true
  worker.run
ensure
  thread.join(0.1)
end

#stop_worker_after(worker, &block) ⇒ Object



12
13
14
15
16
# File 'lib/qless/test_helpers/worker_helpers.rb', line 12

def stop_worker_after(worker, &block)
  yield
ensure
  worker.stop!
end