Class: CI::Queue::Redis::Base

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/ci/queue/redis/base.rb

Direct Known Subclasses

Supervisor, Worker

Defined Under Namespace

Modules: RedisInstrumentation Classes: HeartbeatProcess, State

Constant Summary collapse

TEN_MINUTES =
60 * 10
CONNECTION_ERRORS =
[
  ::Redis::BaseConnectionError,
  ::SocketError, # https://github.com/redis/redis-rb/pull/631
].freeze
DEFAULT_TIMEOUT =
2

Instance Attribute Summary

Attributes included from Common

#config

Instance Method Summary collapse

Methods included from Common

#distributed?, #flaky?, #release!, #report_failure!, #report_success!, #rescue_connection_errors, #retrying?

Constructor Details

#initialize(redis_url, config) ⇒ Base

Returns a new instance of Base.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/ci/queue/redis/base.rb', line 34

def initialize(redis_url, config)
  @redis_url = redis_url
  @config = config
  if ::Redis::VERSION > "5.0.0"
    @redis = ::Redis.new(
      url: redis_url,
      # Booting a CI worker is costly, so in case of a Redis blip,
      # it makes sense to retry for a while before giving up.
      reconnect_attempts: reconnect_attempts,
      middlewares: custom_middlewares,
      # Hosted Redis servers use self signed certificates
      # (because they do not own the domain they're running on such as compute-1.amazonaws.com)
      # therefore a full SSL connection verification will fail.
      # ci-queue should not contain any sensitive data, so we can just disable the verification.
      ssl_params: { verify_mode: OpenSSL::SSL::VERIFY_NONE },
      custom: custom_config,
      timeout: DEFAULT_TIMEOUT,
    )
  else
    @redis = ::Redis.new(url: redis_url, timeout: DEFAULT_TIMEOUT)
  end
end

Instance Method Details

#boot_heartbeat_process!Object



81
82
83
84
85
# File 'lib/ci/queue/redis/base.rb', line 81

def boot_heartbeat_process!
  return unless heartbeat_enabled?

  heartbeat_process.boot!
end

#created_at=(timestamp) ⇒ Object



120
121
122
# File 'lib/ci/queue/redis/base.rb', line 120

def created_at=(timestamp)
  redis.setnx(key('created-at'), timestamp)
end

#custom_configObject



94
95
96
97
98
99
# File 'lib/ci/queue/redis/base.rb', line 94

def custom_config
  return unless config.debug_log

  require 'logger'
  { debug_log: Logger.new(config.debug_log) }
end

#custom_middlewaresObject



101
102
103
104
105
# File 'lib/ci/queue/redis/base.rb', line 101

def custom_middlewares
  return unless config.debug_log

  [RedisInstrumentation]
end

#ensure_heartbeat_thread_alive!Object



74
75
76
77
78
79
# File 'lib/ci/queue/redis/base.rb', line 74

def ensure_heartbeat_thread_alive!
  return unless heartbeat_enabled?
  return if @heartbeat_thread&.alive?

  @heartbeat_thread = Thread.start { heartbeat }
end

#exhausted?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/ci/queue/redis/base.rb', line 107

def exhausted?
  queue_initialized? && size == 0
end

#expired?Boolean

Returns:

  • (Boolean)


111
112
113
114
115
116
117
118
# File 'lib/ci/queue/redis/base.rb', line 111

def expired?
  if (created_at = redis.get(key('created-at')))
    (created_at.to_f + config.redis_ttl + TEN_MINUTES) < CI::Queue.time_now.to_f
  else
    # if there is no created at set anymore we assume queue is expired
    true
  end
end

#increment_test_failed(pipeline: redis) ⇒ Object



184
185
186
# File 'lib/ci/queue/redis/base.rb', line 184

def increment_test_failed(pipeline: redis)
  pipeline.incr(key('test_failed_count'))
end

#max_test_failed?Boolean

Returns:

  • (Boolean)


192
193
194
195
196
# File 'lib/ci/queue/redis/base.rb', line 192

def max_test_failed?
  return false if config.max_test_failed.nil?

  test_failed >= config.max_test_failed
end

#progressObject



150
151
152
# File 'lib/ci/queue/redis/base.rb', line 150

def progress
  total - size
end

#queue_initialized?Boolean

Returns:

  • (Boolean)


173
174
175
176
177
178
# File 'lib/ci/queue/redis/base.rb', line 173

def queue_initialized?
  @queue_initialized ||= begin
    status = master_status
    status == 'ready' || status == 'finished'
  end
end

#queue_initializing?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/ci/queue/redis/base.rb', line 180

def queue_initializing?
  master_status == 'setup'
end

#reconnect_attemptsObject



57
58
59
60
61
# File 'lib/ci/queue/redis/base.rb', line 57

def reconnect_attempts
  return [] if ENV["CI_QUEUE_DISABLE_RECONNECT_ATTEMPTS"]

  [0, 0, 0.1, 0.5, 1, 3, 5]
end

#remainingObject



131
132
133
# File 'lib/ci/queue/redis/base.rb', line 131

def remaining
  redis.llen(key('queue'))
end

#runningObject



135
136
137
# File 'lib/ci/queue/redis/base.rb', line 135

def running
  redis.zcard(key('running'))
end

#sizeObject



124
125
126
127
128
129
# File 'lib/ci/queue/redis/base.rb', line 124

def size
  redis.multi do |transaction|
    transaction.llen(key('queue'))
    transaction.zcard(key('running'))
  end.inject(:+)
end

#stop_heartbeat!Object



87
88
89
90
91
92
# File 'lib/ci/queue/redis/base.rb', line 87

def stop_heartbeat!
  return unless heartbeat_enabled?

  heartbeat_state.set(:stop)
  heartbeat_process.shutdown!
end

#test_failedObject



188
189
190
# File 'lib/ci/queue/redis/base.rb', line 188

def test_failed
  redis.get(key('test_failed_count')).to_i
end

#test_idsObject



139
140
141
142
143
144
# File 'lib/ci/queue/redis/base.rb', line 139

def test_ids
  redis.multi do |transaction|
    transaction.lrange(key('queue'), 0, -1)
    transaction.zrange(key('running'), 0, -1)
  end.flatten
end

#to_aObject



146
147
148
# File 'lib/ci/queue/redis/base.rb', line 146

def to_a
  test_ids.reverse.map { |k| index.fetch(k) }
end

#wait_for_master(timeout: 30) ⇒ Object

Raises:



154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/ci/queue/redis/base.rb', line 154

def wait_for_master(timeout: 30)
  return true if master?
  return true if queue_initialized?

  (timeout * 10 + 1).to_i.times do
    if queue_initialized?
      return true
    else
      sleep 0.1
    end
  end

  raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
end

#with_heartbeat(id) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/ci/queue/redis/base.rb', line 63

def with_heartbeat(id)
  if heartbeat_enabled?
    ensure_heartbeat_thread_alive!
    heartbeat_state.set(:tick, id)
  end

  yield
ensure
  heartbeat_state.set(:reset) if heartbeat_enabled?
end

#workers_countObject



169
170
171
# File 'lib/ci/queue/redis/base.rb', line 169

def workers_count
  redis.scard(key('workers'))
end