Class: RocketJob::Worker
- Inherits:
-
Object
- Object
- RocketJob::Worker
- Includes:
- ActiveSupport::Callbacks, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/worker.rb
Overview
Worker
A worker runs on a single operating system thread Is usually started under a Rocket Job server process.
Defined Under Namespace
Classes: Shutdown
Instance Attribute Summary collapse
-
#current_filter ⇒ Object
Returns the value of attribute current_filter.
-
#id ⇒ Object
Returns the value of attribute id.
-
#inline ⇒ Object
readonly
Returns the value of attribute inline.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#server_name ⇒ Object
readonly
Returns the value of attribute server_name.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Class Method Summary collapse
- .after_running(*filters, &blk) ⇒ Object
- .around_running(*filters, &blk) ⇒ Object
- .before_running(*filters, &blk) ⇒ Object
Instance Method Summary collapse
-
#add_to_current_filter(filter) ⇒ Object
Add the supplied filter to the current filter.
- #alive? ⇒ Boolean
- #backtrace ⇒ Object
- #find_and_assign_job ⇒ Object
-
#initialize(id: 0, server_name: "inline:0", inline: false) ⇒ Worker
constructor
A new instance of Worker.
- #join(*args) ⇒ Object
-
#kill ⇒ Object
Send each active worker the RocketJob::ShutdownException so that stops processing immediately.
-
#next_available_job ⇒ Object
Returns [RocketJob::Job] the next job available for processing.
-
#random_wait_interval ⇒ Object
Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.
-
#reset_filter_if_expired ⇒ Object
Resets the current job filter if the relevant time interval has passed.
-
#run ⇒ Object
Process jobs until it shuts down.
- #shutdown! ⇒ Object
- #shutdown? ⇒ Boolean
-
#throttled_job?(job) ⇒ Boolean
Whether the supplied job has been throttled and should be ignored.
-
#wait_for_shutdown?(timeout = nil) ⇒ Boolean
Returns [true|false] whether the shutdown indicator was set.
Constructor Details
#initialize(id: 0, server_name: "inline:0", inline: false) ⇒ Worker
Returns a new instance of Worker.
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/rocket_job/worker.rb', line 36 def initialize(id: 0, server_name: "inline:0", inline: false) @id = id @server_name = server_name @shutdown = Concurrent::Event.new @name = "#{server_name}:#{id}" @re_check_start = Time.now @current_filter = Config.filter || {} @thread = Thread.new { run } unless inline @inline = inline end |
Instance Attribute Details
#current_filter ⇒ Object
Returns the value of attribute current_filter.
14 15 16 |
# File 'lib/rocket_job/worker.rb', line 14 def current_filter @current_filter end |
#id ⇒ Object
Returns the value of attribute id.
14 15 16 |
# File 'lib/rocket_job/worker.rb', line 14 def id @id end |
#inline ⇒ Object (readonly)
Returns the value of attribute inline.
15 16 17 |
# File 'lib/rocket_job/worker.rb', line 15 def inline @inline end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
15 16 17 |
# File 'lib/rocket_job/worker.rb', line 15 def name @name end |
#server_name ⇒ Object (readonly)
Returns the value of attribute server_name.
15 16 17 |
# File 'lib/rocket_job/worker.rb', line 15 def server_name @server_name end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
15 16 17 |
# File 'lib/rocket_job/worker.rb', line 15 def thread @thread end |
Class Method Details
.after_running(*filters, &blk) ⇒ Object
28 29 30 |
# File 'lib/rocket_job/worker.rb', line 28 def self.after_running(*filters, &blk) set_callback(:running, :after, *filters, &blk) end |
.around_running(*filters, &blk) ⇒ Object
32 33 34 |
# File 'lib/rocket_job/worker.rb', line 32 def self.around_running(*filters, &blk) set_callback(:running, :around, *filters, &blk) end |
.before_running(*filters, &blk) ⇒ Object
24 25 26 |
# File 'lib/rocket_job/worker.rb', line 24 def self.before_running(*filters, &blk) set_callback(:running, :before, *filters, &blk) end |
Instance Method Details
#add_to_current_filter(filter) ⇒ Object
Add the supplied filter to the current filter.
199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/rocket_job/worker.rb', line 199 def add_to_current_filter(filter) filter.each_pair do |k, v| current_filter[k] = if (previous = current_filter[k]) v.is_a?(Array) ? previous + v : v else v end end current_filter end |
#alive? ⇒ Boolean
47 48 49 |
# File 'lib/rocket_job/worker.rb', line 47 def alive? inline ? true : @thread.alive? end |
#backtrace ⇒ Object
51 52 53 |
# File 'lib/rocket_job/worker.rb', line 51 def backtrace inline ? Thread.current.backtrace : @thread.backtrace end |
#find_and_assign_job ⇒ Object
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/rocket_job/worker.rb', line 175 def find_and_assign_job SemanticLogger.silence(:info) do scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now) working = RocketJob::Job.queued.or(state: :running, sub_state: :processing) query = RocketJob::Job.and(working, scheduled) query = query.and(current_filter) unless current_filter.blank? update = {"$set" => {"worker_name" => name, "state" => "running"}} query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) end end |
#join(*args) ⇒ Object
55 56 57 |
# File 'lib/rocket_job/worker.rb', line 55 def join(*args) @thread.join(*args) unless inline end |
#kill ⇒ Object
Send each active worker the RocketJob::ShutdownException so that stops processing immediately.
60 61 62 63 64 |
# File 'lib/rocket_job/worker.rb', line 60 def kill return true if inline @thread.raise(Shutdown, "Shutdown due to kill request for worker: #{name}") if @thread.alive? end |
#next_available_job ⇒ Object
Returns [RocketJob::Job] the next job available for processing. Returns [nil] if no job is available for processing.
Notes:
-
Destroys expired jobs
-
Runs job throttles and skips the job if it is throttled.
-
Adding that filter to the current filter to exclude from subsequent polling.
-
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/rocket_job/worker.rb', line 129 def next_available_job until shutdown? job = find_and_assign_job return unless job if job.expired? job.fail_on_exception! do job.worker_name = name job.destroy logger.info("Destroyed expired job.") end next end # Batch Job that is already started? # Batch has its own throttles for slices. return job if job.running? # Should this job be throttled? next if job.fail_on_exception! { throttled_job?(job) } # Start this job! job.fail_on_exception! { job.start!(name) } return job if job.running? end end |
#random_wait_interval ⇒ Object
Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.
212 213 214 |
# File 'lib/rocket_job/worker.rb', line 212 def random_wait_interval rand(Config.max_poll_seconds * 1000) / 1000 end |
#reset_filter_if_expired ⇒ Object
Resets the current job filter if the relevant time interval has passed
113 114 115 116 117 118 119 120 |
# File 'lib/rocket_job/worker.rb', line 113 def reset_filter_if_expired # Only clear out the current_filter after every `re_check_seconds` time = Time.now return unless (time - @re_check_start) > Config.re_check_seconds @re_check_start = time self.current_filter = Config.filter || {} end |
#run ⇒ Object
Process jobs until it shuts down
Params
worker_id [Integer]
The number of this worker for logging purposes
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/rocket_job/worker.rb', line 84 def run Thread.current.name = format("rocketjob %03i", id) logger.info "Started" until shutdown? sleep_seconds = Config.max_poll_seconds reset_filter_if_expired job = next_available_job # Returns true when work was completed, but no other work is available if job&.rocket_job_work(self, false) # Return the database connections for this thread back to the connection pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) # Stagger workers so that they don't all poll at the same time. sleep_seconds = random_wait_interval end wait_for_shutdown?(sleep_seconds) end logger.info "Stopping" rescue Exception => e logger.fatal("Unhandled exception in job processing thread", e) ensure ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end |
#shutdown! ⇒ Object
70 71 72 |
# File 'lib/rocket_job/worker.rb', line 70 def shutdown! @shutdown.set end |
#shutdown? ⇒ Boolean
66 67 68 |
# File 'lib/rocket_job/worker.rb', line 66 def shutdown? @shutdown.set? end |
#throttled_job?(job) ⇒ Boolean
Whether the supplied job has been throttled and should be ignored.
157 158 159 160 161 162 163 164 165 166 |
# File 'lib/rocket_job/worker.rb', line 157 def throttled_job?(job) # Evaluate job throttles, if any. filter = job.rocket_job_throttles.matching_filter(job) return false unless filter add_to_current_filter(filter) # Restore retrieved job so that other workers can process it later job.set(worker_name: nil, state: :queued) true end |
#wait_for_shutdown?(timeout = nil) ⇒ Boolean
Returns [true|false] whether the shutdown indicator was set
75 76 77 |
# File 'lib/rocket_job/worker.rb', line 75 def wait_for_shutdown?(timeout = nil) @shutdown.wait(timeout) end |