Class: ParallelMinion::Minion
- Inherits:
-
Object
- Object
- ParallelMinion::Minion
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/parallel_minion/minion.rb
Constant Summary collapse
- INFINITE =
Give an infinite amount of time to wait for a Minion to complete a task
-1
Instance Attribute Summary collapse
-
#description ⇒ Object
readonly
Returns [String] the description supplied on the initializer.
-
#exception ⇒ Object
readonly
Returns [Exception] the exception that was raised otherwise nil.
-
#timeout ⇒ Object
readonly
Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task.
Class Method Summary collapse
- .current_scopes ⇒ Object
-
.enabled=(enabled) ⇒ Object
Sets whether minions are enabled to run in their own threads.
-
.enabled? ⇒ Boolean
Returns whether minions are enabled to run in their own threads.
-
.scoped_classes ⇒ Object
The list of classes for which the current scope must be copied into the new Minion (Thread).
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Returns [Boolean] whether the minion has completed working on the task.
-
#enabled? ⇒ Boolean
Returns [Boolean] whether this minion is enabled to run in a separate thread.
-
#failed? ⇒ Boolean
Returns [Boolean] whether the minion failed while performing the assigned task.
-
#initialize(*args, &block) ⇒ Minion
constructor
Create a new thread and Log the time for the thread to complete processing The exception without stack trace is logged whenever an exception is thrown in the thread Re-raises any unhandled exception in the calling thread when it call #result Copy the logging tags and specified ActiveRecord scopes to the new thread.
-
#result ⇒ Object
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any.
-
#time_left ⇒ Object
Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit.
-
#working? ⇒ Boolean
Returns [Boolean] whether the minion is still working on the assigned task.
Constructor Details
#initialize(*args, &block) ⇒ Minion
Create a new thread and
Log the time for the thread to complete processing
The exception without stack trace is logged whenever an exception is
thrown in the thread
Re-raises any unhandled exception in the calling thread when it call #result
Copy the logging and specified ActiveRecord scopes to the new thread
Parameters
:description [String]
Description for this task that the Minion is performing
Put in the log file along with the time take to complete the task
:timeout [Integer]
Maximum amount of time in milli-seconds that the task may take to complete
before #result times out
Set to Minion::INFINITE to give the thread an infinite amount of time to complete
Default: Minion::INFINITE
Notes:
- :timeout does not affect what happens to the Minion running the
the task, it only affects how long #result will take to return.
- The Minion will continue to run even after the timeout has been exceeded
- If :enabled is false, or ParallelMinion::Minion.enabled is false,
then :timeout is ignored and assumed to be Minion::INFINITE
since the code is run in the calling thread when the Minion is created
:enabled [Boolean]
Whether the minion should run in a separate thread
Not recommended in Production, but is useful for debugging purposes
Default: ParallelMinion::Minion.enabled?
*args
Any number of arguments can be supplied that are passed into the block
in the order they are listed
It is recommended to duplicate and/or freeze objects passed as arguments
so that they are not modified at the same time by multiple threads
Proc / lambda
A block of code must be supplied that the Minion will execute
NOTE: This block will be executed within the scope of the created minion
instance and _not_ within the scope of where the Proc/lambda was
originally created.
This is done to force all parameters to be passed in explicitly
and should be read-only or duplicates of the original data
The overhead for moving the task to a Minion (separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.
The following call adds 0.5 ms to total processing time vs running the code in-line:
ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result
NOTE:
On JRuby it is very important to add the following setting to .jrubyrc
thread.pool.enabled=true
Example:
ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date|
MyTable.where('created_at <= ?', date).count
end
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/parallel_minion/minion.rb', line 107 def initialize(*args, &block) raise "Missing mandatory block that Minion must perform" unless block @start_time = Time.now @exception = nil = self.class.(args).dup @timeout = (.delete(:timeout) || Minion::INFINITE).to_f @description = (.delete(:description) || 'Minion').to_s @metric = .delete(:metric) @log_exception = .delete(:log_exception) @enabled = .delete(:enabled) @enabled = self.class.enabled? if @enabled.nil? # Warn about any unknown options. .each_pair do |key,val| logger.warn "Ignoring unknown option: #{key.inspect} => #{val.inspect}" warn "ParallelMinion::Minion Ignoring unknown option: #{key.inspect} => #{val.inspect}" end # Run the supplied block of code in the current thread for testing or # debugging purposes if @enabled == false begin logger.info("Started in the current thread: #{@description}") logger.benchmark_info("Completed in the current thread: #{@description}", log_exception: @log_exception, metric: @metric) do @result = instance_exec(*args, &block) end rescue Exception => exc @exception = exc end return end = (logger. || []).dup # Copy current scopes for new thread. Only applicable for AR models scopes = self.class.current_scopes if defined?(ActiveRecord::Base) @thread = Thread.new(*args) do # Copy logging tags from parent thread logger.tagged(*) do # Set the current thread name to the description for this Minion # so that all log entries in this thread use this thread name Thread.current.name = "#{@description}-#{Thread.current.object_id}" logger.info("Started #{@description}") begin logger.benchmark_info("Completed #{@description}", log_exception: @log_exception, metric: @metric) do # Use the current scope for the duration of the task execution if scopes.nil? || (scopes.size == 0) @result = instance_exec(*args, &block) else # Each Class to scope requires passing a block to .scoping proc = Proc.new { instance_exec(*args, &block) } first = scopes.shift scopes.each {|scope| proc = Proc.new { scope.scoping(&proc) } } @result = first.scoping(&proc) end end rescue Exception => exc @exception = exc nil ensure # Return any database connections used by this thread back to the pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end end end end |
Instance Attribute Details
#description ⇒ Object (readonly)
Returns [String] the description supplied on the initializer
7 8 9 |
# File 'lib/parallel_minion/minion.rb', line 7 def description @description end |
#exception ⇒ Object (readonly)
Returns [Exception] the exception that was raised otherwise nil
10 11 12 |
# File 'lib/parallel_minion/minion.rb', line 10 def exception @exception end |
#timeout ⇒ Object (readonly)
Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task
13 14 15 |
# File 'lib/parallel_minion/minion.rb', line 13 def timeout @timeout end |
Class Method Details
.current_scopes ⇒ Object
232 233 234 |
# File 'lib/parallel_minion/minion.rb', line 232 def self.current_scopes @@scoped_classes.collect {|klass| klass.all} end |
.enabled=(enabled) ⇒ Object
Sets whether minions are enabled to run in their own threads
By Setting enabled to false all Minions that have not yet been started will run in the thread from which it is created and not on its own thread
This is useful:
-
to run tests under the Capybara gem
-
when debugging code so that all code is run sequentially in the current thread
Note: Not recommended to set this setting to false in Production
28 29 30 |
# File 'lib/parallel_minion/minion.rb', line 28 def self.enabled=(enabled) @@enabled = enabled end |
.enabled? ⇒ Boolean
Returns whether minions are enabled to run in their own threads
33 34 35 |
# File 'lib/parallel_minion/minion.rb', line 33 def self.enabled? @@enabled end |
.scoped_classes ⇒ Object
The list of classes for which the current scope must be copied into the new Minion (Thread)
Example:
...
42 43 44 |
# File 'lib/parallel_minion/minion.rb', line 42 def self.scoped_classes @@scoped_classes end |
Instance Method Details
#completed? ⇒ Boolean
Returns [Boolean] whether the minion has completed working on the task
205 206 207 |
# File 'lib/parallel_minion/minion.rb', line 205 def completed? enabled? ? @thread.stop? : true end |
#enabled? ⇒ Boolean
Returns [Boolean] whether this minion is enabled to run in a separate thread
224 225 226 |
# File 'lib/parallel_minion/minion.rb', line 224 def enabled? @enabled end |
#failed? ⇒ Boolean
Returns [Boolean] whether the minion failed while performing the assigned task
210 211 212 |
# File 'lib/parallel_minion/minion.rb', line 210 def failed? !exception.nil? end |
#result ⇒ Object
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any
Note: The result of any thread cannot be nil
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/parallel_minion/minion.rb', line 183 def result # Return nil if Minion is still working and has time left to finish if working? ms = time_left logger.benchmark_info("Waited for Minion to complete: #{@description}", min_duration: 0.01) do if @thread.join(ms.nil? ? nil: ms / 1000).nil? logger.warn("Timed out waiting for result from Minion: #{@description}") return end end end # Return the exception, if any, otherwise the task result exception.nil? ? @result : Kernel.raise(exception) end |
#time_left ⇒ Object
Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)
217 218 219 220 221 |
# File 'lib/parallel_minion/minion.rb', line 217 def time_left return nil if @timeout == INFINITE duration = @timeout - (Time.now - @start_time) * 1000 duration <= 0 ? 0 : duration end |
#working? ⇒ Boolean
Returns [Boolean] whether the minion is still working on the assigned task
200 201 202 |
# File 'lib/parallel_minion/minion.rb', line 200 def working? enabled? ? @thread.alive? : false end |