Class: SimpleWorker::Runner

Inherits:
Object
  • Object
show all
Includes:
Observable, RedisSupport
Defined in:
lib/simpleworker/runner.rb

Overview

Runner.run(redis, tasks, opts)

where tasks is an Array of strings and ‘opts’ is a Hash of options:

:namespace    => String prefix to keys in redis used by SimpleWorker (default: simpleworker)
:timeout      => Fixnum max allowed time between events (default: 30 seconds)
:task_timeout => Fixnum max time allowed for a task to take (default: 10 seconds)
:interval     => Fixnum interval at which SimpleWorker checks the status of all tasks (default: 5 seconds)
:notify       =>        Array[AbstractListener] objects implementing the AbstractListener API
:max_retries  => Fixnum number of times expired tasks will be retried (default: 0)

Constant Summary collapse

DEFAULT_OPTIONS =
{
:timeout      => 30,
:task_timeout => 10,
:interval     => 5,
:namespace    => 'simpleworker',
:log          => true,
:max_retries  => 0}

Instance Attribute Summary

Attributes included from RedisSupport

#jobid, #namespace

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, tasks, opts = {}) ⇒ Runner

Returns a new instance of Runner.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/simpleworker/runner.rb', line 28

def initialize(redis, tasks, opts = {})
  opts = DEFAULT_OPTIONS.dup.merge(opts)

  @redis      = redis
  @jobid      = SecureRandom.hex(6)
  @namespace  = opts[:namespace]
  @timeout    = opts[:timeout]
  @interval   = opts[:interval]
  max_retries = opts[:max_retries]
  listeners   = Array(opts[:notify])

  STDERR.puts 'WARNING: to prevent a race condition :timeout should be > :task_timeout' if @timeout < opts[:task_timeout]
  load_lua_scripts
  @redis.set(config_key, {'task_timeout' => opts[:task_timeout]}.to_json)
  @redis.rpush(tasks_key, tasks)

  if opts[:log]
    listeners << LoggingListener.new
  end

  @event_server = EventServer.new(redis, namespace, jobid)
  @event_monitor = EventMonitor.new
  listeners << @event_monitor

  @retry_listener = RetryListener.new(redis, max_retries, namespace, jobid)
  listeners << @retry_listener

  listeners.each do |listener|
    add_observer listener
    @event_server.add_observer listener
  end
end

Class Method Details

.run(redis, tasks, opts = {}) ⇒ Object



61
62
63
# File 'lib/simpleworker/runner.rb', line 61

def self.run(redis, tasks, opts = {})
  new(redis, tasks, opts).run
end

Instance Method Details

#runObject



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/simpleworker/runner.rb', line 65

def run
  start
  process
  stop
rescue Interrupt
  fire 'on_interrupted'
  stop
rescue StandardError => e
  fire 'on_interrupted'
  stop
  raise e
end