Class: ParallelMinion::Minion
- Inherits:
-
Object
- Object
- ParallelMinion::Minion
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/parallel_minion/minion.rb
Constant Summary collapse
- INFINITE =
Give an infinite amount of time to wait for a Minion to complete a task
0
Class Attribute Summary collapse
-
.completed_log_level ⇒ Object
Returns the value of attribute completed_log_level.
-
.enabled ⇒ Object
writeonly
Sets the attribute enabled.
-
.scoped_classes ⇒ Object
Returns the value of attribute scoped_classes.
-
.started_log_level ⇒ Object
Returns the value of attribute started_log_level.
Instance Attribute Summary collapse
-
#arguments ⇒ Object
readonly
Returns [Array<Object>] list of arguments in the order they were passed into the initializer.
-
#description ⇒ Object
readonly
Returns [String] the description supplied on the initializer.
-
#duration ⇒ Object
readonly
Returns [Float] the number of milli-seconds the the minion took to complete Returns nil if the minion is still running.
-
#exception ⇒ Object
readonly
Returns [Exception] the exception that was raised otherwise nil.
-
#log_exception ⇒ Object
readonly
Returns the value of attribute log_exception.
-
#metric ⇒ Object
readonly
Metrics [String].
-
#on_exception_level ⇒ Object
readonly
Returns the value of attribute on_exception_level.
-
#on_timeout ⇒ Object
readonly
Returns the value of attribute on_timeout.
-
#start_time ⇒ Object
readonly
Returns the value of attribute start_time.
-
#timeout ⇒ Object
readonly
Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task.
-
#wait_metric ⇒ Object
readonly
Metrics [String].
Class Method Summary collapse
- .current_scopes ⇒ Object
-
.enabled? ⇒ Boolean
Returns whether minions are enabled to run in their own threads.
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Returns [Boolean] whether the minion has completed working on the task.
-
#enabled? ⇒ Boolean
Returns [Boolean] whether this minion is enabled to run in a separate thread.
-
#failed? ⇒ Boolean
Returns [Boolean] whether the minion failed while performing the assigned task.
-
#initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, on_exception_level: self.class.completed_log_level, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) ⇒ Minion
constructor
Create a new Minion.
-
#result ⇒ Object
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any.
-
#time_left ⇒ Object
Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit.
-
#working? ⇒ Boolean
Returns [Boolean] whether the minion is still working on the assigned task.
Constructor Details
#initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, on_exception_level: self.class.completed_log_level, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) ⇒ Minion
Create a new Minion
Creates a new thread and logs the time for the supplied block to complete processing.
The exception without stack trace is logged whenever an exception is thrown in the thread.
Re-raises any unhandled exception in the calling thread when `#result` is called.
Copies the logging tags and specified ActiveRecord scopes to the new thread.
Parameters
*arguments
Any number of arguments can be supplied that are passed into the block
in the order they are listed.
Note:
All arguments must be supplied by copy and not by reference.
For example, use `#dup` to create copies of passed data.
Pass by copy is critical to prevent concurrency issues when multiple threads
attempt to update the same object at the same time.
Proc / lambda
A block of code must be supplied that the Minion will execute.
Note:
This block will be executed within the scope of the created minion instance
and _not_ within the scope of where the Proc/lambda was originally created.
This is done to force all parameters to be passed in explicitly
and should be read-only or duplicates of the original data.
:description [String]
Description for this task that the Minion is performing.
Written to the log file along with the time take to complete the task.
:timeout [Integer]
Maximum amount of time in milli-seconds that the task may take to complete
before #result times out.
Set to 0 to give the thread an infinite amount of time to complete.
Default: 0 ( Wait forever )
Notes:
- :timeout does not affect what happens to the Minion running the
the task, it only affects how long #result will take to return.
- The Minion will continue to run even after the timeout has been exceeded
- If :enabled is false, or ParallelMinion::Minion.enabled is false,
then :timeout is ignored and assumed to be Minion::INFINITE
since the code is run in the calling thread when the Minion is created
:on_timeout [Exception]
The class to raise on the minion when the minion times out.
By raising the exception on the running thread it ensures that the thread
ends due to the exception, rather than continuing to execute.
The exception is only raised on the running minion when #result is called.
The current call to #result will complete with a result of nil, future
calls to #result will raise the supplied exception on the current thread
since the thread will have terminated with that exception.
Note: :on_timeout has no effect if not #enabled?
:metric [String]
Name of the metric to forward to Semantic Logger when measuring the minion execution time.
Example: inquiry/address_cleansing
When a metric is supplied the following metrics will also be generated:
- wait
Duration waiting for a minion to complete.
The additional metrics are added to the supplied metric name. For example:
- inquiry/address_cleansing/wait
:log_exception [Symbol]
Control whether or how an exception thrown in the block is
reported by Semantic Logger. Values:
:full
Log the exception class, message, and backtrace
:partial
Log the exception class and message. The backtrace will not be logged
:off
Any unhandled exception raised in the block will not be logged
Default: :partial
:on_exception_level [:trace | :debug | :info | :warn | :error | :fatal]
Override the log level only when an exception occurs.
Default: ParallelMinion::Minion.completed_log_level
:enabled [Boolean]
Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance.
The overhead for moving the task to a Minion (separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.
The following call adds 0.5 ms to total processing time vs running the code in-line:
ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result
Note:
On JRuby it is recommended to add the following setting to .jrubyrc
thread.pool.enabled=true
Example:
ParallelMinion::Minion.new(
10.days.ago,
description: 'Doing something else in parallel',
timeout: 1000
) do |date|
MyTable.where('created_at <= ?', date).count
end
Example, when the result is being ignored, log full exception as an error:
ParallelMinion::Minion.new(
customer,
description: "We don't care about the result",
log_exception: :full,
on_exception_level: :error
) do |customer|
customer.save!
end
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/parallel_minion/minion.rb', line 217 def initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, on_exception_level: self.class.completed_log_level, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) raise 'Missing mandatory block that Minion must perform' unless block @start_time = Time.now @exception = nil @arguments = arguments @timeout = timeout.to_f @description = description.to_s @metric = metric @log_exception = log_exception @on_exception_level = on_exception_level @enabled = enabled @on_timeout = on_timeout @wait_metric = (wait_metric || "#{metric}/wait") if @metric # When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion' unless @enabled l = self.class.logger.dup l.name = 'Inline' self.logger = l end @enabled ? run(&block) : run_inline(&block) end |
Class Attribute Details
.completed_log_level ⇒ Object
Returns the value of attribute completed_log_level.
92 93 94 |
# File 'lib/parallel_minion/minion.rb', line 92 def completed_log_level @completed_log_level end |
.enabled=(value) ⇒ Object (writeonly)
Sets the attribute enabled
44 45 46 |
# File 'lib/parallel_minion/minion.rb', line 44 def enabled=(value) @enabled = value end |
.scoped_classes ⇒ Object
Returns the value of attribute scoped_classes.
58 59 60 |
# File 'lib/parallel_minion/minion.rb', line 58 def scoped_classes @scoped_classes end |
.started_log_level ⇒ Object
Returns the value of attribute started_log_level.
77 78 79 |
# File 'lib/parallel_minion/minion.rb', line 77 def started_log_level @started_log_level end |
Instance Attribute Details
#arguments ⇒ Object (readonly)
Returns [Array<Object>] list of arguments in the order they were passed into the initializer
16 17 18 |
# File 'lib/parallel_minion/minion.rb', line 16 def arguments @arguments end |
#description ⇒ Object (readonly)
Returns [String] the description supplied on the initializer
7 8 9 |
# File 'lib/parallel_minion/minion.rb', line 7 def description @description end |
#duration ⇒ Object (readonly)
Returns [Float] the number of milli-seconds the the minion took to complete Returns nil if the minion is still running
20 21 22 |
# File 'lib/parallel_minion/minion.rb', line 20 def duration @duration end |
#exception ⇒ Object (readonly)
Returns [Exception] the exception that was raised otherwise nil
10 11 12 |
# File 'lib/parallel_minion/minion.rb', line 10 def exception @exception end |
#log_exception ⇒ Object (readonly)
Returns the value of attribute log_exception.
25 26 27 |
# File 'lib/parallel_minion/minion.rb', line 25 def log_exception @log_exception end |
#metric ⇒ Object (readonly)
Metrics [String]
23 24 25 |
# File 'lib/parallel_minion/minion.rb', line 23 def metric @metric end |
#on_exception_level ⇒ Object (readonly)
Returns the value of attribute on_exception_level.
25 26 27 |
# File 'lib/parallel_minion/minion.rb', line 25 def on_exception_level @on_exception_level end |
#on_timeout ⇒ Object (readonly)
Returns the value of attribute on_timeout.
25 26 27 |
# File 'lib/parallel_minion/minion.rb', line 25 def on_timeout @on_timeout end |
#start_time ⇒ Object (readonly)
Returns the value of attribute start_time.
25 26 27 |
# File 'lib/parallel_minion/minion.rb', line 25 def start_time @start_time end |
#timeout ⇒ Object (readonly)
Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task
13 14 15 |
# File 'lib/parallel_minion/minion.rb', line 13 def timeout @timeout end |
#wait_metric ⇒ Object (readonly)
Metrics [String]
23 24 25 |
# File 'lib/parallel_minion/minion.rb', line 23 def wait_metric @wait_metric end |
Class Method Details
.current_scopes ⇒ Object
311 312 313 |
# File 'lib/parallel_minion/minion.rb', line 311 def self.current_scopes scoped_classes.collect(&:all) end |
.enabled? ⇒ Boolean
Returns whether minions are enabled to run in their own threads
48 49 50 |
# File 'lib/parallel_minion/minion.rb', line 48 def self.enabled? @enabled end |
Instance Method Details
#completed? ⇒ Boolean
Returns [Boolean] whether the minion has completed working on the task
284 285 286 |
# File 'lib/parallel_minion/minion.rb', line 284 def completed? enabled? ? @thread.stop? : true end |
#enabled? ⇒ Boolean
Returns [Boolean] whether this minion is enabled to run in a separate thread
303 304 305 |
# File 'lib/parallel_minion/minion.rb', line 303 def enabled? @enabled end |
#failed? ⇒ Boolean
Returns [Boolean] whether the minion failed while performing the assigned task
289 290 291 |
# File 'lib/parallel_minion/minion.rb', line 289 def failed? !exception.nil? end |
#result ⇒ Object
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any
Note: The result of any thread cannot be nil
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/parallel_minion/minion.rb', line 256 def result # Return nil if Minion is still working and has time left to finish if working? ms = time_left logger.measure( self.class.completed_log_level, "Waited for Minion to complete: #{description}", min_duration: 0.01, metric: wait_metric ) do if @thread.join(ms.nil? ? nil : ms / 1000).nil? @thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout logger.warn("Timed out waiting for: #{description}") return end end end # Return the exception, if any, otherwise the task result exception.nil? ? @result : Kernel.raise(exception) end |
#time_left ⇒ Object
Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)
296 297 298 299 300 |
# File 'lib/parallel_minion/minion.rb', line 296 def time_left return nil if timeout.zero? || (timeout == -1) duration = timeout - (Time.now - start_time) * 1000 duration <= 0 ? 0 : duration end |
#working? ⇒ Boolean
Returns [Boolean] whether the minion is still working on the assigned task
279 280 281 |
# File 'lib/parallel_minion/minion.rb', line 279 def working? enabled? ? @thread.alive? : false end |