Class: Fairway::Queue
- Inherits:
-
Object
- Object
- Fairway::Queue
- Defined in:
- lib/fairway/queue.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#queue_names ⇒ Object
readonly
Returns the value of attribute queue_names.
Instance Method Summary collapse
- #==(other) ⇒ Object
- #active_facets ⇒ Object
- #destroy ⇒ Object
- #inflight_limit=(limit) ⇒ Object
-
#initialize(connection, *queue_names) ⇒ Queue
constructor
A new instance of Queue.
- #length ⇒ Object
- #peek ⇒ Object
- #pull ⇒ Object
- #queue_key ⇒ Object
- #redis ⇒ Object
- #scripts ⇒ Object
- #unique_queues ⇒ Object
Constructor Details
#initialize(connection, *queue_names) ⇒ Queue
Returns a new instance of Queue.
5 6 7 8 |
# File 'lib/fairway/queue.rb', line 5 def initialize(connection, *queue_names) @connection = connection @queue_names = parse_queue_names(queue_names) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
3 4 5 |
# File 'lib/fairway/queue.rb', line 3 def connection @connection end |
#queue_names ⇒ Object (readonly)
Returns the value of attribute queue_names.
3 4 5 |
# File 'lib/fairway/queue.rb', line 3 def queue_names @queue_names end |
Instance Method Details
#==(other) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/fairway/queue.rb', line 52 def ==(other) other.respond_to?(:connection) && other.respond_to?(:queue_names) && connection == other.connection && queue_names == other.queue_names end |
#active_facets ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/fairway/queue.rb', line 10 def active_facets facet_names = [] redis.with_each do |conn| facet_names += unique_queues.map do |queue| conn.smembers("#{queue}:active_facets") end.flatten end facet_names.uniq.map do |name| Facet.new(self, name) end end |
#destroy ⇒ Object
67 68 69 |
# File 'lib/fairway/queue.rb', line 67 def destroy scripts.fairway_destroy(unique_queues) end |
#inflight_limit=(limit) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fairway/queue.rb', line 32 def inflight_limit=(limit) redis.with_each do |conn| unique_queues.each do |queue| if limit < 1 conn.del("#{queue}:limit") else conn.set("#{queue}:limit", limit) end end end end |
#length ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/fairway/queue.rb', line 24 def length redis.pools.map do |pool| pool.with do |conn| conn.mget(unique_queues.map{|q| "#{q}:length" }).map(&:to_i).sum end end.sum end |
#peek ⇒ Object
44 45 46 |
# File 'lib/fairway/queue.rb', line 44 def peek scripts.fairway_peek(@queue_names.shuffle.uniq) end |
#pull ⇒ Object
48 49 50 |
# File 'lib/fairway/queue.rb', line 48 def pull scripts.fairway_pull(Time.now.to_i, -1, @queue_names.shuffle.uniq) end |
#queue_key ⇒ Object
63 64 65 |
# File 'lib/fairway/queue.rb', line 63 def queue_key queue end |
#redis ⇒ Object
71 72 73 |
# File 'lib/fairway/queue.rb', line 71 def redis @connection.redis end |
#scripts ⇒ Object
75 76 77 |
# File 'lib/fairway/queue.rb', line 75 def scripts @connection.scripts end |
#unique_queues ⇒ Object
59 60 61 |
# File 'lib/fairway/queue.rb', line 59 def unique_queues @queue_names.uniq end |