Class: CI::Queue::Redis::Base
- Inherits:
-
Object
- Object
- CI::Queue::Redis::Base
show all
- Includes:
- Common
- Defined in:
- lib/ci/queue/redis/base.rb
Defined Under Namespace
Modules: RedisInstrumentation
Classes: HeartbeatProcess, State
Constant Summary
collapse
- TEN_MINUTES =
60 * 10
- CONNECTION_ERRORS =
[
::Redis::BaseConnectionError,
::SocketError, ].freeze
- DEFAULT_TIMEOUT =
2
Instance Attribute Summary
Attributes included from Common
#config
Instance Method Summary
collapse
Methods included from Common
#distributed?, #flaky?, #release!, #report_failure!, #report_success!, #rescue_connection_errors, #retrying?
Constructor Details
#initialize(redis_url, config) ⇒ Base
Returns a new instance of Base.
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/ci/queue/redis/base.rb', line 34
def initialize(redis_url, config)
@redis_url = redis_url
@config = config
if ::Redis::VERSION > "5.0.0"
@redis = ::Redis.new(
url: redis_url,
reconnect_attempts: reconnect_attempts,
middlewares: custom_middlewares,
ssl_params: { verify_mode: OpenSSL::SSL::VERIFY_NONE },
custom: custom_config,
timeout: DEFAULT_TIMEOUT,
)
else
@redis = ::Redis.new(url: redis_url, timeout: DEFAULT_TIMEOUT)
end
end
|
Instance Method Details
#boot_heartbeat_process! ⇒ Object
81
82
83
84
85
|
# File 'lib/ci/queue/redis/base.rb', line 81
def boot_heartbeat_process!
return unless heartbeat_enabled?
heartbeat_process.boot!
end
|
#created_at=(timestamp) ⇒ Object
120
121
122
|
# File 'lib/ci/queue/redis/base.rb', line 120
def created_at=(timestamp)
redis.setnx(key('created-at'), timestamp)
end
|
#custom_config ⇒ Object
94
95
96
97
98
99
|
# File 'lib/ci/queue/redis/base.rb', line 94
def custom_config
return unless config.debug_log
require 'logger'
{ debug_log: Logger.new(config.debug_log) }
end
|
#custom_middlewares ⇒ Object
101
102
103
104
105
|
# File 'lib/ci/queue/redis/base.rb', line 101
def custom_middlewares
return unless config.debug_log
[RedisInstrumentation]
end
|
#ensure_heartbeat_thread_alive! ⇒ Object
74
75
76
77
78
79
|
# File 'lib/ci/queue/redis/base.rb', line 74
def ensure_heartbeat_thread_alive!
return unless heartbeat_enabled?
return if @heartbeat_thread&.alive?
@heartbeat_thread = Thread.start { heartbeat }
end
|
#exhausted? ⇒ Boolean
107
108
109
|
# File 'lib/ci/queue/redis/base.rb', line 107
def exhausted?
queue_initialized? && size == 0
end
|
#expired? ⇒ Boolean
111
112
113
114
115
116
117
118
|
# File 'lib/ci/queue/redis/base.rb', line 111
def expired?
if (created_at = redis.get(key('created-at')))
(created_at.to_f + config.redis_ttl + TEN_MINUTES) < CI::Queue.time_now.to_f
else
true
end
end
|
#increment_test_failed(pipeline: redis) ⇒ Object
184
185
186
|
# File 'lib/ci/queue/redis/base.rb', line 184
def increment_test_failed(pipeline: redis)
pipeline.incr(key('test_failed_count'))
end
|
#max_test_failed? ⇒ Boolean
192
193
194
195
196
|
# File 'lib/ci/queue/redis/base.rb', line 192
def max_test_failed?
return false if config.max_test_failed.nil?
test_failed >= config.max_test_failed
end
|
#progress ⇒ Object
150
151
152
|
# File 'lib/ci/queue/redis/base.rb', line 150
def progress
total - size
end
|
#queue_initialized? ⇒ Boolean
173
174
175
176
177
178
|
# File 'lib/ci/queue/redis/base.rb', line 173
def queue_initialized?
@queue_initialized ||= begin
status = master_status
status == 'ready' || status == 'finished'
end
end
|
#queue_initializing? ⇒ Boolean
180
181
182
|
# File 'lib/ci/queue/redis/base.rb', line 180
def queue_initializing?
master_status == 'setup'
end
|
#reconnect_attempts ⇒ Object
57
58
59
60
61
|
# File 'lib/ci/queue/redis/base.rb', line 57
def reconnect_attempts
return [] if ENV["CI_QUEUE_DISABLE_RECONNECT_ATTEMPTS"]
[0, 0, 0.1, 0.5, 1, 3, 5]
end
|
#remaining ⇒ Object
131
132
133
|
# File 'lib/ci/queue/redis/base.rb', line 131
def remaining
redis.llen(key('queue'))
end
|
#running ⇒ Object
135
136
137
|
# File 'lib/ci/queue/redis/base.rb', line 135
def running
redis.zcard(key('running'))
end
|
#size ⇒ Object
124
125
126
127
128
129
|
# File 'lib/ci/queue/redis/base.rb', line 124
def size
redis.multi do |transaction|
transaction.llen(key('queue'))
transaction.zcard(key('running'))
end.inject(:+)
end
|
#stop_heartbeat! ⇒ Object
87
88
89
90
91
92
|
# File 'lib/ci/queue/redis/base.rb', line 87
def stop_heartbeat!
return unless heartbeat_enabled?
heartbeat_state.set(:stop)
heartbeat_process.shutdown!
end
|
#test_failed ⇒ Object
188
189
190
|
# File 'lib/ci/queue/redis/base.rb', line 188
def test_failed
redis.get(key('test_failed_count')).to_i
end
|
#test_ids ⇒ Object
139
140
141
142
143
144
|
# File 'lib/ci/queue/redis/base.rb', line 139
def test_ids
redis.multi do |transaction|
transaction.lrange(key('queue'), 0, -1)
transaction.zrange(key('running'), 0, -1)
end.flatten
end
|
#to_a ⇒ Object
146
147
148
|
# File 'lib/ci/queue/redis/base.rb', line 146
def to_a
test_ids.reverse.map { |k| index.fetch(k) }
end
|
#wait_for_master(timeout: 30) ⇒ Object
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
# File 'lib/ci/queue/redis/base.rb', line 154
def wait_for_master(timeout: 30)
return true if master?
return true if queue_initialized?
(timeout * 10 + 1).to_i.times do
if queue_initialized?
return true
else
sleep 0.1
end
end
raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
end
|
#with_heartbeat(id) ⇒ Object
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/ci/queue/redis/base.rb', line 63
def with_heartbeat(id)
if heartbeat_enabled?
ensure_heartbeat_thread_alive!
heartbeat_state.set(:tick, id)
end
yield
ensure
heartbeat_state.set(:reset) if heartbeat_enabled?
end
|
#workers_count ⇒ Object
169
170
171
|
# File 'lib/ci/queue/redis/base.rb', line 169
def workers_count
redis.scard(key('workers'))
end
|