Class: Upperkut::Strategy
- Inherits:
-
Object
- Object
- Upperkut::Strategy
show all
- Includes:
- Util
- Defined in:
- lib/upperkut/strategy.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#decode_json_items, #encode_json_items, #to_underscore
Constructor Details
#initialize(worker, options = {}) ⇒ Strategy
Returns a new instance of Strategy.
10
11
12
13
14
15
|
# File 'lib/upperkut/strategy.rb', line 10
def initialize(worker, options = {})
@options = options
@redis_options = options.fetch(:redis, {})
@redis_pool = setup_redis_pool
@worker = worker
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
8
9
10
|
# File 'lib/upperkut/strategy.rb', line 8
def options
@options
end
|
Instance Method Details
#clear ⇒ Object
51
52
53
|
# File 'lib/upperkut/strategy.rb', line 51
def clear
redis { |conn| conn.del(key) }
end
|
#fetch_items(batch_size = 1000) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/upperkut/strategy.rb', line 25
def fetch_items(batch_size = 1000)
stop = [batch_size, size].min
items = redis do |conn|
conn.multi do
stop.times { conn.lpop(key) }
end
end
decode_json_items(items)
end
|
#latency ⇒ Object
43
44
45
46
47
48
49
|
# File 'lib/upperkut/strategy.rb', line 43
def latency
item = redis { |conn| conn.lrange(key, 0, 0) }
item = decode_json_items(item).first
return 0 unless item
now = Time.now.to_f
now - item.fetch('enqueued_at', Time.now).to_f
end
|
#push_items(items = []) ⇒ Object
17
18
19
20
21
22
23
|
# File 'lib/upperkut/strategy.rb', line 17
def push_items(items = [])
items = [items] if items.is_a?(Hash)
return false if items.empty?
redis do |conn|
conn.rpush(key, encode_json_items(items))
end
end
|
#size ⇒ Object
37
38
39
40
41
|
# File 'lib/upperkut/strategy.rb', line 37
def size
redis do |conn|
conn.llen(key)
end
end
|