Class: ParallelMinion::Minion

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/parallel_minion/minion.rb

Constant Summary collapse

INFINITE =

Give an infinite amount of time to wait for a Minion to complete a task

-1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Minion

Create a new thread and

Log the time for the thread to complete processing
The exception without stack trace is logged whenever an exception is
thrown in the thread
Re-raises any unhandled exception in the calling thread when it call #result
Copy the logging tags and specified ActiveRecord scopes to the new thread

Parameters

:description [String]
  Description for this task that the Minion is performing
  Put in the log file along with the time take to complete the task

:timeout [Integer]
  Maximum amount of time in milli-seconds that the task may take to complete
  before #result times out
  Set to Minion::INFINITE to give the thread an infinite amount of time to complete
  Default: Minion::INFINITE

  Notes:
  - :timeout does not affect what happens to the Minion running the
    the task, it only affects how long #result will take to return.
  - The Minion will continue to run even after the timeout has been exceeded
  - If :enabled is false, or ParallelMinion::Minion.enabled is false,
    then :timeout is ignored and assumed to be Minion::INFINITE
    since the code is run in the calling thread when the Minion is created

:enabled [Boolean]
  Whether the minion should run in a separate thread
  Not recommended in Production, but is useful for debugging purposes
  Default: ParallelMinion::Minion.enabled?

*args
  Any number of arguments can be supplied that are passed into the block
  in the order they are listed
  It is recommended to duplicate and/or freeze objects passed as arguments
  so that they are not modified at the same time by multiple threads

Proc / lambda
  A block of code must be supplied that the Minion will execute
  NOTE: This block will be executed within the scope of the created minion
        instance and _not_ within the scope of where the Proc/lambda was
        originally created.
        This is done to force all parameters to be passed in explicitly
        and should be read-only or duplicates of the original data

The overhead for moving the task to a Minion (separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.

The following call adds 0.5 ms to total processing time vs running the code in-line:

ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result

NOTE:

On JRuby it is very important to add the following setting to .jrubyrc
    thread.pool.enabled=true

Example:

ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date|
  MyTable.where('created_at <= ?', date).count
end


107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/parallel_minion/minion.rb', line 107

def initialize(*args, &block)
  raise "Missing mandatory block that Minion must perform" unless block
  @start_time = Time.now
  @exception = nil

  options = self.class.extract_options!(args).dup

  @timeout       = (options.delete(:timeout) || Minion::INFINITE).to_f
  @description   = (options.delete(:description) || 'Minion').to_s
  @metric        = options.delete(:metric)
  @log_exception = options.delete(:log_exception)
  @enabled       = options.delete(:enabled)
  @enabled       = self.class.enabled? if @enabled.nil?

  # Warn about any unknown options.
  options.each_pair do |key,val|
    logger.warn "Ignoring unknown option: #{key.inspect} => #{val.inspect}"
    warn "ParallelMinion::Minion Ignoring unknown option: #{key.inspect} => #{val.inspect}"
  end

  # Run the supplied block of code in the current thread for testing or
  # debugging purposes
  if @enabled == false
    begin
      logger.info("Started in the current thread: #{@description}")
      logger.benchmark_info("Completed in the current thread: #{@description}", log_exception: @log_exception, metric: @metric) do
        @result = instance_exec(*args, &block)
      end
    rescue Exception => exc
      @exception = exc
    end
    return
  end

  tags = (logger.tags || []).dup

  # Copy current scopes for new thread. Only applicable for AR models
  scopes = self.class.current_scopes if defined?(ActiveRecord::Base)

  @thread = Thread.new(*args) do
    # Copy logging tags from parent thread
    logger.tagged(*tags) do
      # Set the current thread name to the description for this Minion
      # so that all log entries in this thread use this thread name
      Thread.current.name = "#{@description}-#{Thread.current.object_id}"
      logger.info("Started #{@description}")

      begin
        logger.benchmark_info("Completed #{@description}", log_exception: @log_exception, metric: @metric) do
          # Use the current scope for the duration of the task execution
          if scopes.nil? || (scopes.size == 0)
            @result = instance_exec(*args, &block)
          else
            # Each Class to scope requires passing a block to .scoping
            proc = Proc.new { instance_exec(*args, &block) }
            first = scopes.shift
            scopes.each {|scope| proc = Proc.new { scope.scoping(&proc) } }
            @result = first.scoping(&proc)
          end
        end
      rescue Exception => exc
        @exception = exc
        nil
      ensure
        # Return any database connections used by this thread back to the pool
        ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
      end
    end
  end
end

Instance Attribute Details

#descriptionObject (readonly)

Returns [String] the description supplied on the initializer



7
8
9
# File 'lib/parallel_minion/minion.rb', line 7

def description
  @description
end

#exceptionObject (readonly)

Returns [Exception] the exception that was raised otherwise nil



10
11
12
# File 'lib/parallel_minion/minion.rb', line 10

def exception
  @exception
end

#timeoutObject (readonly)

Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task



13
14
15
# File 'lib/parallel_minion/minion.rb', line 13

def timeout
  @timeout
end

Class Method Details

.current_scopesObject



232
233
234
# File 'lib/parallel_minion/minion.rb', line 232

def self.current_scopes
  @@scoped_classes.collect {|klass| klass.all}
end

.enabled=(enabled) ⇒ Object

Sets whether minions are enabled to run in their own threads

By Setting enabled to false all Minions that have not yet been started will run in the thread from which it is created and not on its own thread

This is useful:

  • to run tests under the Capybara gem

  • when debugging code so that all code is run sequentially in the current thread

Note: Not recommended to set this setting to false in Production



28
29
30
# File 'lib/parallel_minion/minion.rb', line 28

def self.enabled=(enabled)
  @@enabled = enabled
end

.enabled?Boolean

Returns whether minions are enabled to run in their own threads

Returns:

  • (Boolean)


33
34
35
# File 'lib/parallel_minion/minion.rb', line 33

def self.enabled?
  @@enabled
end

.scoped_classesObject

The list of classes for which the current scope must be copied into the new Minion (Thread)

Example:

...


42
43
44
# File 'lib/parallel_minion/minion.rb', line 42

def self.scoped_classes
  @@scoped_classes
end

Instance Method Details

#completed?Boolean

Returns [Boolean] whether the minion has completed working on the task

Returns:

  • (Boolean)


205
206
207
# File 'lib/parallel_minion/minion.rb', line 205

def completed?
  enabled? ? @thread.stop? : true
end

#enabled?Boolean

Returns [Boolean] whether this minion is enabled to run in a separate thread

Returns:

  • (Boolean)


224
225
226
# File 'lib/parallel_minion/minion.rb', line 224

def enabled?
  @enabled
end

#failed?Boolean

Returns [Boolean] whether the minion failed while performing the assigned task

Returns:

  • (Boolean)


210
211
212
# File 'lib/parallel_minion/minion.rb', line 210

def failed?
  !exception.nil?
end

#resultObject

Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any

Note: The result of any thread cannot be nil



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/parallel_minion/minion.rb', line 183

def result
  # Return nil if Minion is still working and has time left to finish
  if working?
    ms = time_left
    logger.benchmark_info("Waited for Minion to complete: #{@description}", min_duration: 0.01) do
      if @thread.join(ms.nil? ? nil: ms / 1000).nil?
        logger.warn("Timed out waiting for result from Minion: #{@description}")
        return
      end
    end
  end

  # Return the exception, if any, otherwise the task result
  exception.nil? ? @result : Kernel.raise(exception)
end

#time_leftObject

Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)



217
218
219
220
221
# File 'lib/parallel_minion/minion.rb', line 217

def time_left
  return nil if @timeout == INFINITE
  duration = @timeout - (Time.now - @start_time) * 1000
  duration <= 0 ? 0 : duration
end

#working?Boolean

Returns [Boolean] whether the minion is still working on the assigned task

Returns:

  • (Boolean)


200
201
202
# File 'lib/parallel_minion/minion.rb', line 200

def working?
  enabled? ? @thread.alive? : false
end