Class: Qless::Worker
- Inherits:
-
Object
- Object
- Qless::Worker
- Defined in:
- lib/qless/worker.rb
Overview
This is heavily inspired by Resque’s excellent worker: github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb
Instance Attribute Summary collapse
-
#output ⇒ Object
An IO-like object that logging output is sent to.
-
#run_as_single_process ⇒ Object
Whether the worker should run in a single prcoess i.e.
-
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT.
-
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT.
Class Method Summary collapse
-
.start ⇒ Object
Starts a worker based on ENV vars.
Instance Method Summary collapse
-
#initialize(client, job_reserver, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #pause_processing ⇒ Object
- #paused? ⇒ Boolean
- #perform(job) ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #shutdown? ⇒ Boolean
- #unpause_processing ⇒ Object
- #work(interval = 5.0) ⇒ Object
Constructor Details
#initialize(client, job_reserver, options = {}) ⇒ Worker
Returns a new instance of Worker.
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/qless/worker.rb', line 10 def initialize(client, job_reserver, = {}) @client, @job_reserver = client, job_reserver @shutdown = @paused = false self.very_verbose = [:very_verbose] self.verbose = [:verbose] self.run_as_single_process = [:run_as_single_process] self.output = .fetch(:output, $stdout) output.puts "\n\n\n" if verbose || very_verbose log "Instantiated Worker" end |
Instance Attribute Details
#output ⇒ Object
An IO-like object that logging output is sent to. Defaults to $stdout.
36 37 38 |
# File 'lib/qless/worker.rb', line 36 def output @output end |
#run_as_single_process ⇒ Object
Whether the worker should run in a single prcoess i.e. not fork a child process to do the work This should only be true in a dev/test environment
32 33 34 |
# File 'lib/qless/worker.rb', line 32 def run_as_single_process @run_as_single_process end |
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT
24 25 26 |
# File 'lib/qless/worker.rb', line 24 def verbose @verbose end |
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT
27 28 29 |
# File 'lib/qless/worker.rb', line 27 def very_verbose @very_verbose end |
Class Method Details
.start ⇒ Object
Starts a worker based on ENV vars. Supported ENV vars:
- REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically)
- QUEUES=high,medium,low or QUEUE=blah
- JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin
- INTERVAL=3.2
- VERBOSE=true (to enable logging)
- VVERBOSE=true (to enable very verbose logging)
- RUN_AS_SINGLE_PROCESS=true (false will fork children to do work, true will keep it single process)
This is designed to be called from a rake task
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/qless/worker.rb', line 47 def self.start client = Qless::Client.new queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',').map { |q| client.queues[q.strip] } if queues.none? raise "No queues provided. You must pass QUEUE or QUEUES when starting a worker." end reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues) interval = Float(ENV.fetch('INTERVAL', 5.0)) = {} [:verbose] = !!ENV['VERBOSE'] [:very_verbose] = !!ENV['VVERBOSE'] [:run_as_single_process] = !!ENV['RUN_AS_SINGLE_PROCESS'] new(client, reserver, ).work(interval) end |
Instance Method Details
#pause_processing ⇒ Object
125 126 127 128 129 |
# File 'lib/qless/worker.rb', line 125 def pause_processing log "USR2 received; pausing job processing" @paused = true procline "Paused -- #{@job_reserver.description}" end |
#paused? ⇒ Boolean
121 122 123 |
# File 'lib/qless/worker.rb', line 121 def paused? @paused end |
#perform(job) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/qless/worker.rb', line 100 def perform(job) around_perform(job) rescue Exception => error fail_job(job, error) else job.complete unless job.state_changed? end |
#shutdown ⇒ Object
108 109 110 |
# File 'lib/qless/worker.rb', line 108 def shutdown @shutdown = true end |
#shutdown! ⇒ Object
112 113 114 115 |
# File 'lib/qless/worker.rb', line 112 def shutdown! shutdown kill_child unless run_as_single_process end |
#shutdown? ⇒ Boolean
117 118 119 |
# File 'lib/qless/worker.rb', line 117 def shutdown? @shutdown end |
#unpause_processing ⇒ Object
131 132 133 134 |
# File 'lib/qless/worker.rb', line 131 def unpause_processing log "CONT received; resuming job processing" @paused = false end |
#work(interval = 5.0) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/qless/worker.rb', line 65 def work(interval = 5.0) procline "Starting #{@job_reserver.description}" register_signal_handlers loop do break if shutdown? next if paused? unless job = @job_reserver.reserve break if interval.zero? procline "Waiting for #{@job_reserver.description}" log! "Sleeping for #{interval} seconds" sleep interval next end log "got: #{job.inspect}" if run_as_single_process # We're staying in the same process procline "Single processing #{job.description}" perform(job) elsif @child = fork # We're in the parent process procline "Forked #{@child} for #{job.description}" Process.wait(@child) else # We're in the child process procline "Processing #{job.description}" perform(job) exit! end end end |