Class: Reliable::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable/queue.rb

Constant Summary collapse

FatalError =
Class.new(StandardError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Queue

Returns a new instance of Queue.



15
16
17
18
19
20
21
# File 'lib/reliable/queue.rb', line 15

def initialize(name)
  @name = name
  @base_key = "reliable:queues:#{name}"
  @redis = Redis.new
  @pending_key = @base_key + ":pending"
  @failed_key = @base_key + ":failed"
end

Instance Attribute Details

#base_keyObject

Returns the value of attribute base_key.



13
14
15
# File 'lib/reliable/queue.rb', line 13

def base_key
  @base_key
end

#uuidObject

Returns the value of attribute uuid.



13
14
15
# File 'lib/reliable/queue.rb', line 13

def uuid
  @uuid
end

Instance Method Details

#create_worker(&work) ⇒ Object



42
43
44
# File 'lib/reliable/queue.rb', line 42

def create_worker(&work)
  Worker.new(self, &work)
end

#current_timeObject



23
24
25
# File 'lib/reliable/queue.rb', line 23

def current_time
  @redis.time
end

#each(&block) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/reliable/queue.rb', line 61

def each(&block)
  if block_given?
    # This is because we confuse iteration with work here
    # So we need to front-load the work, then fake the iteration
    to_enum(&block).each { |item| item }
  else
    to_enum
  end
end

#failedObject



38
39
40
# File 'lib/reliable/queue.rb', line 38

def failed
  @failed ||= List.new(@failed_key, @redis)
end

#loggerObject



100
101
102
# File 'lib/reliable/queue.rb', line 100

def logger
  Reliable.logger
end

#notify(e, other = {}) ⇒ Object



104
105
106
107
108
109
# File 'lib/reliable/queue.rb', line 104

def notify(e, other = {})
  # TODO: make configurable
  logger.info e.inspect
  logger.info e.backtrace
  logger.info other.inspect
end

#peach(opts = {}, &block) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/reliable/queue.rb', line 71

def peach(opts = {}, &block)
  raise "must supply a block" unless block_given?

  concurrency = opts.fetch(:concurrency)

  threads = concurrency.times.map do
    Thread.new { each(&block) }
  end

  threads.map { |t| t.abort_on_exception = true }
  threads.map(&:join)
end

#pendingObject



34
35
36
# File 'lib/reliable/queue.rb', line 34

def pending
  @pending ||= List.new(@pending_key, @redis)
end

#push(value) ⇒ Object Also known as: <<



27
28
29
30
31
# File 'lib/reliable/queue.rb', line 27

def push(value)
  UUID.new(current_time) do |uuid|
    @redis.set_and_lpush(pending.key, uuid.to_s, value)
  end
end

#take(number, &block) ⇒ Object



57
58
59
# File 'lib/reliable/queue.rb', line 57

def take(number, &block)
  to_enum(&block).take(number)
end

#to_enum(&work) ⇒ Object



46
47
48
49
50
51
52
53
54
55
# File 'lib/reliable/queue.rb', line 46

def to_enum(&work)
  work ||= ->(item) { item }
  worker = create_worker(&work)
  Enumerator.new do |y|
    loop do                   # forever
      result = worker.next    # do work
      y.yield result          # then release control
    end
  end
end

#total_itemsObject



96
97
98
# File 'lib/reliable/queue.rb', line 96

def total_items
  @redis.scan("reliable:items:*").length
end

#total_processingObject



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/reliable/queue.rb', line 84

def total_processing
  keys = @redis.scan "reliable:queues:*:workers:*:processing"

  lengths = @redis.pipeline do |pipe|
    keys.each do |key|
      pipe.llen key
    end
  end

  lengths.map(&:to_i).reduce(0, &:+)
end