Module: Deferrer::Runner

Included in:
Deferrer
Defined in:
lib/deferrer/runner.rb

Instance Method Summary collapse

Instance Method Details

#defer_at(timestamp, identifier, *args) ⇒ Object



50
51
52
53
54
55
56
57
# File 'lib/deferrer/runner.rb', line 50

def defer_at(timestamp, identifier, *args)
  key  = item_key(identifier)
  item = { 'args' => args }

  push_item(key, item, timestamp)

  process_item(next_item(key)) if @inline
end

#defer_in(number_of_seconds_from_now, identifier, *args) ⇒ Object



45
46
47
48
# File 'lib/deferrer/runner.rb', line 45

def defer_in(number_of_seconds_from_now, identifier, *args)
  timestamp = Time.now + number_of_seconds_from_now
  defer_at(timestamp, identifier, *args)
end

#next_item(key = next_key) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/deferrer/runner.rb', line 28

def next_item(key = next_key)
  return nil unless key

  item         = nil
  decoded_item = nil

  item = redis.rpop(key)
  if item
    decoded_item = decode(item)
    decoded_item['key'] = key
  end

  remove(key)

  decoded_item
end

#run(options = {}) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/deferrer/runner.rb', line 3

def run(options = {})
  loop_frequency = options.fetch(:loop_frequency, 0.1)
  single_run     = options.fetch(:single_run, false)
  @ignore_time   = options.fetch(:ignore_time, false)
  @inline        = options.fetch(:inline, false)

  raise WorkerNotConfigured unless worker

  loop do
    begin
      while item = next_item
        process_item(item)
      end
    rescue StandardError => e
      log(:error, "Error: #{e.class}: #{e.message}")
    rescue Exception => e
      log(:error, "Error: #{e.class}: #{e.message}")
      raise
    end

    break if single_run
    sleep loop_frequency
  end
end