Class: ParallelMinion::Minion
- Inherits:
-
Object
- Object
- ParallelMinion::Minion
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/parallel_minion/minion.rb
Constant Summary collapse
- INFINITE =
Give an infinite amount of time to wait for a Minion to complete a task
-1
Instance Attribute Summary collapse
-
#description ⇒ Object
readonly
Returns [String] the description supplied on the initializer.
-
#exception ⇒ Object
readonly
Returns [Exception] the exception that was raised otherwise nil.
-
#timeout ⇒ Object
readonly
Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task.
Class Method Summary collapse
-
.current_scopes ⇒ Object
Returns the current scopes for each of the models for which scopes will be copied to the Minions.
-
.scoped_classes ⇒ Object
The list of classes for which the current scope must be copied into the new Minion (Thread).
-
.synchronous=(synchronous) ⇒ Object
Sets whether to run in Synchronous mode.
-
.synchronous? ⇒ Boolean
Returns whether running in Synchronous mode.
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Returns [Boolean] whether the minion has completed working on the task.
-
#failed? ⇒ Boolean
Returns [Boolean] whether the minion failed while performing the assigned task.
-
#initialize(*args, &block) ⇒ Minion
constructor
Create a new thread and Log the time for the thread to complete processing The exception without stack trace is logged whenever an exception is thrown in the thread Re-raises any unhandled exception in the calling thread when it call #result Copy the logging tags and specified ActiveRecord scopes to the new thread.
-
#result ⇒ Object
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any.
-
#synchronous? ⇒ Boolean
Returns [Boolean] whether synchronous mode has been enabled for this minion instance.
-
#time_left ⇒ Object
Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit.
-
#working? ⇒ Boolean
Returns [Boolean] whether the minion is still working on the assigned task.
Constructor Details
#initialize(*args, &block) ⇒ Minion
Create a new thread and
Log the time for the thread to complete processing
The exception without stack trace is logged whenever an exception is
thrown in the thread
Re-raises any unhandled exception in the calling thread when it call #result
Copy the logging and specified ActiveRecord scopes to the new thread
Parameters
:description [String]
Description for this task that the Minion is performing
Put in the log file along with the time take to complete the task
:timeout [Integer]
Maximum amount of time in milli-seconds that the task may take to complete
before #result times out
Set to Minion::INFINITE to give the thread an infinite amount of time to complete
Default: Minion::INFINITE
Notes:
- :timeout does not affect what happens to the Minion running the
the task, it only affects how long #result will take to return.
- The Minion will continue to run even after the timeout has been exceeded
- If :synchronous is true, or ParallelMinion::Minion.synchronous is
set to true, then :timeout is ignored and assumed to be Minion::INFINITE
since the code is run in the calling thread when the Minion is created
:synchronous [Boolean]
Whether the Minion should run in the current thread
Not recommended in Production, but is useful for debugging purposes
Default: false
*args
Any number of arguments can be supplied that are passed into the block
in the order they are listed
It is recommended to duplicate and/or freeze objects passed as arguments
so that they are not modified at the same time by multiple threads
Proc / lambda
A block of code must be supplied that the Minion will execute
NOTE: This block will be executed within the scope of the created minion
instance and _not_ within the scope of where the Proc/lambda was
originally created.
This is done to force all parameters to be passed in explicitly
and should be read-only or duplicates of the original data
The overhead for moving the task to a Minion (separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.
The following call adds 0.5 ms to total processing time vs running the code in-line:
ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result
NOTE:
On JRuby it is very important to add the following setting to .jrubyrc
thread.pool.enabled=true
Example:
ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date|
MyTable.where('created_at <= ?', date).count
end
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/parallel_minion/minion.rb', line 108 def initialize(*args, &block) raise "Missing mandatory block that Minion must perform" unless block @start_time = Time.now @exception = nil = self.class.(args).dup @timeout = (.delete(:timeout) || Minion::INFINITE).to_f @description = (.delete(:description) || 'Minion').to_s @log_exception = .delete(:log_exception) @synchronous = .delete(:synchronous) || self.class.synchronous? # Warn about any unknown options. .each_pair { |key,val| logger.warn "Ignoring unknown option: #{key.inspect} => #{val.inspect}" } # Run the supplied block of code in the current thread for testing or # debugging purposes if @synchronous == true begin logger.info("Started synchronously #{@description}") logger.benchmark_info("Completed synchronously #{@description}", log_exception: @log_exception) do @result = instance_exec(*args, &block) end rescue Exception => exc @exception = exc end return end = (logger. || []).dup # Copy current scopes for new thread. Only applicable for AR models scopes = self.class.current_scopes.dup if defined?(ActiveRecord::Base) @thread = Thread.new(*args) do # Copy logging tags from parent thread logger.tagged(*) do # Set the current thread name to the description for this Minion # so that all log entries in this thread use this thread name Thread.current.name = "#{@description}-#{Thread.current.object_id}" logger.info("Started #{@description}") begin logger.benchmark_info("Completed #{@description}", log_exception: @log_exception) do # Use the current scope for the duration of the task execution if scopes.nil? || (scopes.size == 0) @result = instance_exec(*args, &block) else # Each Class to scope requires passing a block to .scoping proc = Proc.new { instance_exec(*args, &block) } first = scopes.shift scopes.each {|scope| proc = Proc.new { scope.scoping(&proc) } } @result = first.scoping(&proc) end end rescue Exception => exc @exception = exc nil ensure # Return any database connections used by this thread back to the pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end end end end |
Instance Attribute Details
#description ⇒ Object (readonly)
Returns [String] the description supplied on the initializer
7 8 9 |
# File 'lib/parallel_minion/minion.rb', line 7 def description @description end |
#exception ⇒ Object (readonly)
Returns [Exception] the exception that was raised otherwise nil
10 11 12 |
# File 'lib/parallel_minion/minion.rb', line 10 def exception @exception end |
#timeout ⇒ Object (readonly)
Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task
13 14 15 |
# File 'lib/parallel_minion/minion.rb', line 13 def timeout @timeout end |
Class Method Details
.current_scopes ⇒ Object
Returns the current scopes for each of the models for which scopes will be copied to the Minions
221 222 223 224 |
# File 'lib/parallel_minion/minion.rb', line 221 def self.current_scopes # Apparently #scoped is deprecated, but its replacement #all does not behave the same @@scoped_classes.collect {|klass| klass.scoped.dup} end |
.scoped_classes ⇒ Object
The list of classes for which the current scope must be copied into the new Minion (Thread)
Example:
...
43 44 45 |
# File 'lib/parallel_minion/minion.rb', line 43 def self.scoped_classes @@scoped_classes end |
.synchronous=(synchronous) ⇒ Object
Sets whether to run in Synchronous mode
By Setting synchronous to true all Minions that have not yet been started will run in the thread from which they are started and not in their own threads
This is useful:
-
to run tests under the Capybara gem
-
when debugging code so that all code is run sequentially in the current thread
Note: Do not set this setting to true in Production
29 30 31 |
# File 'lib/parallel_minion/minion.rb', line 29 def self.synchronous=(synchronous) @@synchronous = synchronous end |
.synchronous? ⇒ Boolean
Returns whether running in Synchronous mode
34 35 36 |
# File 'lib/parallel_minion/minion.rb', line 34 def self.synchronous? @@synchronous end |
Instance Method Details
#completed? ⇒ Boolean
Returns [Boolean] whether the minion has completed working on the task
196 197 198 |
# File 'lib/parallel_minion/minion.rb', line 196 def completed? synchronous? ? true : @thread.stop? end |
#failed? ⇒ Boolean
Returns [Boolean] whether the minion failed while performing the assigned task
201 202 203 |
# File 'lib/parallel_minion/minion.rb', line 201 def failed? !exception.nil? end |
#result ⇒ Object
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any
Note: The result of any thread cannot be nil
179 180 181 182 183 184 185 186 187 188 |
# File 'lib/parallel_minion/minion.rb', line 179 def result # Return nil if Minion is still working and has time left to finish if working? ms = time_left return if @thread.join(ms.nil? ? nil: ms / 1000).nil? end # Return the exception, if any, otherwise the task result exception.nil? ? @result : Kernel.raise(exception) end |
#synchronous? ⇒ Boolean
Returns [Boolean] whether synchronous mode has been enabled for this minion instance
215 216 217 |
# File 'lib/parallel_minion/minion.rb', line 215 def synchronous? @synchronous end |
#time_left ⇒ Object
Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)
208 209 210 211 212 |
# File 'lib/parallel_minion/minion.rb', line 208 def time_left return nil if @timeout == INFINITE duration = @timeout - (Time.now - @start_time) * 1000 duration <= 0 ? 0 : duration end |
#working? ⇒ Boolean
Returns [Boolean] whether the minion is still working on the assigned task
191 192 193 |
# File 'lib/parallel_minion/minion.rb', line 191 def working? synchronous? ? false : @thread.alive? end |