Class: CI::Queue::Redis::Base
- Inherits:
-
Object
- Object
- CI::Queue::Redis::Base
show all
- Includes:
- Common
- Defined in:
- lib/ci/queue/redis/base.rb
Constant Summary
collapse
- CONNECTION_ERRORS =
[
::Redis::BaseConnectionError,
::SocketError, ].freeze
Instance Attribute Summary
Attributes included from Common
#config
Instance Method Summary
collapse
Methods included from Common
#flaky?, #release!, #report_failure!, #report_success!, #rescue_connection_errors, #retrying?
Constructor Details
#initialize(redis_url, config) ⇒ Base
Returns a new instance of Base.
13
14
15
16
17
|
# File 'lib/ci/queue/redis/base.rb', line 13
def initialize(redis_url, config)
@redis_url = redis_url
@redis = ::Redis.new(url: redis_url)
@config = config
end
|
Instance Method Details
#exhausted? ⇒ Boolean
19
20
21
|
# File 'lib/ci/queue/redis/base.rb', line 19
def exhausted?
queue_initialized? && size == 0
end
|
#increment_test_failed ⇒ Object
64
65
66
|
# File 'lib/ci/queue/redis/base.rb', line 64
def increment_test_failed
redis.incr(key('test_failed_count'))
end
|
#max_test_failed? ⇒ Boolean
72
73
74
75
76
|
# File 'lib/ci/queue/redis/base.rb', line 72
def max_test_failed?
return false if config.max_test_failed.nil?
test_failed >= config.max_test_failed
end
|
#progress ⇒ Object
37
38
39
|
# File 'lib/ci/queue/redis/base.rb', line 37
def progress
total - size
end
|
#queue_initialized? ⇒ Boolean
57
58
59
60
61
62
|
# File 'lib/ci/queue/redis/base.rb', line 57
def queue_initialized?
@queue_initialized ||= begin
status = master_status
status == 'ready' || status == 'finished'
end
end
|
#size ⇒ Object
23
24
25
26
27
28
|
# File 'lib/ci/queue/redis/base.rb', line 23
def size
redis.multi do |transaction|
transaction.llen(key('queue'))
transaction.zcard(key('running'))
end.inject(:+)
end
|
#test_failed ⇒ Object
68
69
70
|
# File 'lib/ci/queue/redis/base.rb', line 68
def test_failed
redis.get(key('test_failed_count')).to_i
end
|
#to_a ⇒ Object
30
31
32
33
34
35
|
# File 'lib/ci/queue/redis/base.rb', line 30
def to_a
redis.multi do |transaction|
transaction.lrange(key('queue'), 0, -1)
transaction.zrange(key('running'), 0, -1)
end.flatten.reverse.map { |k| index.fetch(k) }
end
|
#wait_for_master(timeout: 30) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/ci/queue/redis/base.rb', line 41
def wait_for_master(timeout: 30)
return true if master?
(timeout * 10 + 1).to_i.times do
if queue_initialized?
return true
else
sleep 0.1
end
end
raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
end
|
#workers_count ⇒ Object
53
54
55
|
# File 'lib/ci/queue/redis/base.rb', line 53
def workers_count
redis.scard(key('workers'))
end
|