Class: Sidekiq::Tasks::Storage

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/tasks/storage.rb

Constant Summary collapse

JID_PREFIX =
"task".freeze
HISTORY_LIMIT =
10
ERROR_MESSAGE_MAX_LENGTH =
255

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task_name) ⇒ Storage

Returns a new instance of Storage.



10
11
12
# File 'lib/sidekiq/tasks/storage.rb', line 10

def initialize(task_name)
  @task_name = task_name
end

Instance Attribute Details

#task_nameObject (readonly)

Returns the value of attribute task_name.



8
9
10
# File 'lib/sidekiq/tasks/storage.rb', line 8

def task_name
  @task_name
end

Instance Method Details

#historyObject



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/sidekiq/tasks/storage.rb', line 26

def history
  raw_entries = Sidekiq.redis { |conn| conn.lrange(history_key, 0, -1) }

  return [] unless raw_entries

  raw_entries.map do |raw|
    entry = Sidekiq.load_json(raw)
    %w[enqueued_at executed_at finished_at].each do |key|
      entry[key] = Time.at(entry[key]) if entry[key]
    end
    entry
  end
end

#history_keyObject



18
19
20
# File 'lib/sidekiq/tasks/storage.rb', line 18

def history_key
  "#{jid_key}:history"
end

#jid_keyObject



14
15
16
# File 'lib/sidekiq/tasks/storage.rb', line 14

def jid_key
  "#{JID_PREFIX}:#{task_name}"
end

#last_enqueue_atObject



22
23
24
# File 'lib/sidekiq/tasks/storage.rb', line 22

def last_enqueue_at
  stored_time("last_enqueue_at")
end

#store_enqueue(jid, args) ⇒ Object



48
49
50
51
52
# File 'lib/sidekiq/tasks/storage.rb', line 48

def store_enqueue(jid, args)
  time = Time.now.to_f
  store_time(time, "last_enqueue_at")
  store_history(jid, args, time)
end

#store_execution(jid, time_key) ⇒ Object



54
55
56
57
58
# File 'lib/sidekiq/tasks/storage.rb', line 54

def store_execution(jid, time_key)
  update_history_entry(jid) do |entry|
    entry.merge(time_key => Time.now.to_f)
  end
end

#store_execution_error(jid, error) ⇒ Object



60
61
62
63
64
65
# File 'lib/sidekiq/tasks/storage.rb', line 60

def store_execution_error(jid, error)
  update_history_entry(jid) do |entry|
    error_message = truncate_message("#{error.class}: #{error.message}", ERROR_MESSAGE_MAX_LENGTH)
    entry.merge("error" => error_message)
  end
end

#store_history(jid, task_args, time) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/sidekiq/tasks/storage.rb', line 40

def store_history(jid, task_args, time)
  Sidekiq.redis do |conn|
    task_trace = {jid: jid, name: task_name, args: task_args, enqueued_at: time.to_f}
    conn.lpush(history_key, Sidekiq.dump_json(task_trace))
    conn.ltrim(history_key, 0, HISTORY_LIMIT - 1)
  end
end