Class: CI::Queue::Redis::Base

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/ci/queue/redis/base.rb

Direct Known Subclasses

Supervisor, Worker

Constant Summary collapse

CONNECTION_ERRORS =
[
  ::Redis::BaseConnectionError,
  ::SocketError, # https://github.com/redis/redis-rb/pull/631
].freeze

Instance Attribute Summary

Attributes included from Common

#config

Instance Method Summary collapse

Methods included from Common

#flaky?, #release!, #report_failure!, #report_success!, #rescue_connection_errors, #retrying?

Constructor Details

#initialize(redis_url, config) ⇒ Base

Returns a new instance of Base.



13
14
15
16
17
# File 'lib/ci/queue/redis/base.rb', line 13

def initialize(redis_url, config)
  @redis_url = redis_url
  @redis = ::Redis.new(url: redis_url)
  @config = config
end

Instance Method Details

#exhausted?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/ci/queue/redis/base.rb', line 19

def exhausted?
  queue_initialized? && size == 0
end

#increment_test_failedObject



64
65
66
# File 'lib/ci/queue/redis/base.rb', line 64

def increment_test_failed
  redis.incr(key('test_failed_count'))
end

#max_test_failed?Boolean

Returns:

  • (Boolean)


72
73
74
75
76
# File 'lib/ci/queue/redis/base.rb', line 72

def max_test_failed?
  return false if config.max_test_failed.nil?

  test_failed >= config.max_test_failed
end

#progressObject



37
38
39
# File 'lib/ci/queue/redis/base.rb', line 37

def progress
  total - size
end

#queue_initialized?Boolean

Returns:

  • (Boolean)


57
58
59
60
61
62
# File 'lib/ci/queue/redis/base.rb', line 57

def queue_initialized?
  @queue_initialized ||= begin
    status = master_status
    status == 'ready' || status == 'finished'
  end
end

#sizeObject



23
24
25
26
27
28
# File 'lib/ci/queue/redis/base.rb', line 23

def size
  redis.multi do |transaction|
    transaction.llen(key('queue'))
    transaction.zcard(key('running'))
  end.inject(:+)
end

#test_failedObject



68
69
70
# File 'lib/ci/queue/redis/base.rb', line 68

def test_failed
  redis.get(key('test_failed_count')).to_i
end

#to_aObject



30
31
32
33
34
35
# File 'lib/ci/queue/redis/base.rb', line 30

def to_a
  redis.multi do |transaction|
    transaction.lrange(key('queue'), 0, -1)
    transaction.zrange(key('running'), 0, -1)
  end.flatten.reverse.map { |k| index.fetch(k) }
end

#wait_for_master(timeout: 30) ⇒ Object

Raises:



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/ci/queue/redis/base.rb', line 41

def wait_for_master(timeout: 30)
  return true if master?
  (timeout * 10 + 1).to_i.times do
    if queue_initialized?
      return true
    else
      sleep 0.1
    end
  end
  raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
end

#workers_countObject



53
54
55
# File 'lib/ci/queue/redis/base.rb', line 53

def workers_count
  redis.scard(key('workers'))
end