Class: Fairway::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairway/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, *queue_names) ⇒ Queue

Returns a new instance of Queue.



5
6
7
8
# File 'lib/fairway/queue.rb', line 5

def initialize(connection, *queue_names)
  @connection  = connection
  @queue_names = parse_queue_names(queue_names)
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



3
4
5
# File 'lib/fairway/queue.rb', line 3

def connection
  @connection
end

#queue_namesObject (readonly)

Returns the value of attribute queue_names.



3
4
5
# File 'lib/fairway/queue.rb', line 3

def queue_names
  @queue_names
end

Instance Method Details

#==(other) ⇒ Object



52
53
54
55
56
57
# File 'lib/fairway/queue.rb', line 52

def ==(other)
  other.respond_to?(:connection) &&
  other.respond_to?(:queue_names) &&
  connection == other.connection &&
  queue_names == other.queue_names
end

#active_facetsObject



10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/fairway/queue.rb', line 10

def active_facets
  facet_names = []

  redis.with_each do |conn|
    facet_names += unique_queues.map do |queue|
      conn.smembers("#{queue}:active_facets")
    end.flatten
  end

  facet_names.uniq.map do |name|
    Facet.new(self, name)
  end
end

#destroyObject



67
68
69
# File 'lib/fairway/queue.rb', line 67

def destroy
  scripts.fairway_destroy(unique_queues)
end

#inflight_limit=(limit) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fairway/queue.rb', line 32

def inflight_limit=(limit)
  redis.with_each do |conn|
    unique_queues.each do |queue|
      if limit < 1
        conn.del("#{queue}:limit")
      else
        conn.set("#{queue}:limit", limit)
      end
    end
  end
end

#lengthObject



24
25
26
27
28
29
30
# File 'lib/fairway/queue.rb', line 24

def length
  redis.pools.map do |pool|
    pool.with do |conn|
      conn.mget(unique_queues.map{|q| "#{q}:length" }).map(&:to_i).sum
    end
  end.sum
end

#peekObject



44
45
46
# File 'lib/fairway/queue.rb', line 44

def peek
  scripts.fairway_peek(@queue_names.shuffle.uniq)
end

#pullObject



48
49
50
# File 'lib/fairway/queue.rb', line 48

def pull
  scripts.fairway_pull(Time.now.to_i, -1, @queue_names.shuffle.uniq)
end

#queue_keyObject



63
64
65
# File 'lib/fairway/queue.rb', line 63

def queue_key
  queue
end

#redisObject



71
72
73
# File 'lib/fairway/queue.rb', line 71

def redis
  @connection.redis
end

#scriptsObject



75
76
77
# File 'lib/fairway/queue.rb', line 75

def scripts
  @connection.scripts
end

#unique_queuesObject



59
60
61
# File 'lib/fairway/queue.rb', line 59

def unique_queues
  @queue_names.uniq
end