Class: ParallelMinion::Minion

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/parallel_minion/minion.rb

Constant Summary collapse

INFINITE =

Give an infinite amount of time to wait for a Minion to complete a task

0

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, on_exception_level: self.class.completed_log_level, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) ⇒ Minion

Create a new Minion

Creates a new thread and logs the time for the supplied block to complete processing.
The exception without stack trace is logged whenever an exception is thrown in the thread.

Re-raises any unhandled exception in the calling thread when `#result` is called.
Copies the logging tags and specified ActiveRecord scopes to the new thread.

Parameters

*arguments
  Any number of arguments can be supplied that are passed into the block
  in the order they are listed.

  Note:
    All arguments must be supplied by copy and not by reference.
    For example, use `#dup` to create copies of passed data.
    Pass by copy is critical to prevent concurrency issues when multiple threads
    attempt to update the same object at the same time.

Proc / lambda
  A block of code must be supplied that the Minion will execute.

  Note:
    This block will be executed within the scope of the created minion instance
    and _not_ within the scope of where the Proc/lambda was originally created.
    This is done to force all parameters to be passed in explicitly
    and should be read-only or duplicates of the original data.

:description [String]
  Description for this task that the Minion is performing.
  Written to the log file along with the time take to complete the task.

:timeout [Integer]
  Maximum amount of time in milli-seconds that the task may take to complete
  before #result times out.
  Set to 0 to give the thread an infinite amount of time to complete.
  Default: 0 ( Wait forever )

  Notes:
  - :timeout does not affect what happens to the Minion running the
    the task, it only affects how long #result will take to return.
  - The Minion will continue to run even after the timeout has been exceeded
  - If :enabled is false, or ParallelMinion::Minion.enabled is false,
    then :timeout is ignored and assumed to be Minion::INFINITE
    since the code is run in the calling thread when the Minion is created

:on_timeout [Exception]
  The class to raise on the minion when the minion times out.
  By raising the exception on the running thread it ensures that the thread
  ends due to the exception, rather than continuing to execute.
  The exception is only raised on the running minion when #result is called.
  The current call to #result will complete with a result of nil, future
  calls to #result will raise the supplied exception on the current thread
  since the thread will have terminated with that exception.

  Note: :on_timeout has no effect if not #enabled?

:metric [String]
  Name of the metric to forward to Semantic Logger when measuring the minion execution time.
  Example: inquiry/address_cleansing

  When a metric is supplied the following metrics will also be generated:
  - wait
      Duration waiting for a minion to complete.

  The additional metrics are added to the supplied metric name. For example:
  - inquiry/address_cleansing/wait

:log_exception [Symbol]
  Control whether or how an exception thrown in the block is
  reported by Semantic Logger. Values:
   :full
     Log the exception class, message, and backtrace
   :partial
     Log the exception class and message. The backtrace will not be logged
   :off
     Any unhandled exception raised in the block will not be logged
   Default: :partial

:on_exception_level [:trace | :debug | :info | :warn | :error | :fatal]
  Override the log level only when an exception occurs.
  Default: ParallelMinion::Minion.completed_log_level

:enabled [Boolean]
  Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance.

The overhead for moving the task to a Minion (separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.

The following call adds 0.5 ms to total processing time vs running the code in-line:

ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result

Note:

On JRuby it is recommended to add the following setting to .jrubyrc
  thread.pool.enabled=true

Example:

ParallelMinion::Minion.new(
  10.days.ago,
  description: 'Doing something else in parallel',
  timeout:     1000
) do |date|
  MyTable.where('created_at <= ?', date).count
end

Example, when the result is being ignored, log full exception as an error:

ParallelMinion::Minion.new(
  customer,
  description:        "We don't care about the result",
  log_exception:      :full,
  on_exception_level: :error
) do |customer|
  customer.save!
end


217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/parallel_minion/minion.rb', line 217

def initialize(*arguments,
               description: 'Minion',
               metric: nil,
               log_exception: nil,
               on_exception_level: self.class.completed_log_level,
               enabled: self.class.enabled?,
               timeout: INFINITE,
               on_timeout: nil,
               wait_metric: nil,
               &block)
  raise 'Missing mandatory block that Minion must perform' unless block
  @start_time         = Time.now
  @exception          = nil
  @arguments          = arguments
  @timeout            = timeout.to_f
  @description        = description.to_s
  @metric             = metric
  @log_exception      = log_exception
  @on_exception_level = on_exception_level
  @enabled            = enabled
  @on_timeout         = on_timeout

  @wait_metric        = (wait_metric || "#{metric}/wait") if @metric

  # When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion'
  unless @enabled
    l           = self.class.logger.dup
    l.name      = 'Inline'
    self.logger = l
  end

  @enabled ? run(&block) : run_inline(&block)
end

Class Attribute Details

.completed_log_levelObject

Returns the value of attribute completed_log_level.



92
93
94
# File 'lib/parallel_minion/minion.rb', line 92

def completed_log_level
  @completed_log_level
end

.enabled=(value) ⇒ Object (writeonly)

Sets the attribute enabled

Parameters:

  • value

    the value to set the attribute enabled to.



44
45
46
# File 'lib/parallel_minion/minion.rb', line 44

def enabled=(value)
  @enabled = value
end

.scoped_classesObject

Returns the value of attribute scoped_classes.



58
59
60
# File 'lib/parallel_minion/minion.rb', line 58

def scoped_classes
  @scoped_classes
end

.started_log_levelObject

Returns the value of attribute started_log_level.



77
78
79
# File 'lib/parallel_minion/minion.rb', line 77

def started_log_level
  @started_log_level
end

Instance Attribute Details

#argumentsObject (readonly)

Returns [Array<Object>] list of arguments in the order they were passed into the initializer



16
17
18
# File 'lib/parallel_minion/minion.rb', line 16

def arguments
  @arguments
end

#descriptionObject (readonly)

Returns [String] the description supplied on the initializer



7
8
9
# File 'lib/parallel_minion/minion.rb', line 7

def description
  @description
end

#durationObject (readonly)

Returns [Float] the number of milli-seconds the the minion took to complete Returns nil if the minion is still running



20
21
22
# File 'lib/parallel_minion/minion.rb', line 20

def duration
  @duration
end

#exceptionObject (readonly)

Returns [Exception] the exception that was raised otherwise nil



10
11
12
# File 'lib/parallel_minion/minion.rb', line 10

def exception
  @exception
end

#log_exceptionObject (readonly)

Returns the value of attribute log_exception.



25
26
27
# File 'lib/parallel_minion/minion.rb', line 25

def log_exception
  @log_exception
end

#metricObject (readonly)

Metrics [String]



23
24
25
# File 'lib/parallel_minion/minion.rb', line 23

def metric
  @metric
end

#on_exception_levelObject (readonly)

Returns the value of attribute on_exception_level.



25
26
27
# File 'lib/parallel_minion/minion.rb', line 25

def on_exception_level
  @on_exception_level
end

#on_timeoutObject (readonly)

Returns the value of attribute on_timeout.



25
26
27
# File 'lib/parallel_minion/minion.rb', line 25

def on_timeout
  @on_timeout
end

#start_timeObject (readonly)

Returns the value of attribute start_time.



25
26
27
# File 'lib/parallel_minion/minion.rb', line 25

def start_time
  @start_time
end

#timeoutObject (readonly)

Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task



13
14
15
# File 'lib/parallel_minion/minion.rb', line 13

def timeout
  @timeout
end

#wait_metricObject (readonly)

Metrics [String]



23
24
25
# File 'lib/parallel_minion/minion.rb', line 23

def wait_metric
  @wait_metric
end

Class Method Details

.current_scopesObject



311
312
313
# File 'lib/parallel_minion/minion.rb', line 311

def self.current_scopes
  scoped_classes.collect(&:all)
end

.enabled?Boolean

Returns whether minions are enabled to run in their own threads

Returns:

  • (Boolean)


48
49
50
# File 'lib/parallel_minion/minion.rb', line 48

def self.enabled?
  @enabled
end

Instance Method Details

#completed?Boolean

Returns [Boolean] whether the minion has completed working on the task

Returns:

  • (Boolean)


284
285
286
# File 'lib/parallel_minion/minion.rb', line 284

def completed?
  enabled? ? @thread.stop? : true
end

#enabled?Boolean

Returns [Boolean] whether this minion is enabled to run in a separate thread

Returns:

  • (Boolean)


303
304
305
# File 'lib/parallel_minion/minion.rb', line 303

def enabled?
  @enabled
end

#failed?Boolean

Returns [Boolean] whether the minion failed while performing the assigned task

Returns:

  • (Boolean)


289
290
291
# File 'lib/parallel_minion/minion.rb', line 289

def failed?
  !exception.nil?
end

#resultObject

Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any

Note: The result of any thread cannot be nil



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/parallel_minion/minion.rb', line 256

def result
  # Return nil if Minion is still working and has time left to finish
  if working?
    ms = time_left
    logger.measure(
      self.class.completed_log_level,
      "Waited for Minion to complete: #{description}",
      min_duration: 0.01,
      metric:       wait_metric
    ) do
      if @thread.join(ms.nil? ? nil : ms / 1000).nil?
        @thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout
        logger.warn("Timed out waiting for: #{description}")
        return
      end
    end
  end

  # Return the exception, if any, otherwise the task result
  exception.nil? ? @result : Kernel.raise(exception)
end

#time_leftObject

Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)



296
297
298
299
300
# File 'lib/parallel_minion/minion.rb', line 296

def time_left
  return nil if timeout.zero? || (timeout == -1)
  duration = timeout - (Time.now - start_time) * 1000
  duration <= 0 ? 0 : duration
end

#working?Boolean

Returns [Boolean] whether the minion is still working on the assigned task

Returns:

  • (Boolean)


279
280
281
# File 'lib/parallel_minion/minion.rb', line 279

def working?
  enabled? ? @thread.alive? : false
end