Class: CI::Queue::Static

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/ci/queue/static.rb

Direct Known Subclasses

File, Grind, Redis::Retry

Constant Summary collapse

TEN_MINUTES =
60 * 10

Constants included from Common

Common::CONNECTION_ERRORS

Instance Attribute Summary collapse

Attributes included from Common

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Common

#flaky?, #release!, #report_failure!, #report_success!, #rescue_connection_errors, #retrying?

Constructor Details

#initialize(tests, config) ⇒ Static

Returns a new instance of Static.



21
22
23
24
25
26
27
# File 'lib/ci/queue/static.rb', line 21

def initialize(tests, config)
  @queue = tests
  @config = config
  @progress = 0
  @total = tests.size
  @shutdown = false
end

Instance Attribute Details

#entry_resolverObject

Returns the value of attribute entry_resolver.



19
20
21
# File 'lib/ci/queue/static.rb', line 19

def entry_resolver
  @entry_resolver
end

#progressObject (readonly)

Returns the value of attribute progress.



18
19
20
# File 'lib/ci/queue/static.rb', line 18

def progress
  @progress
end

#totalObject (readonly)

Returns the value of attribute total.



18
19
20
# File 'lib/ci/queue/static.rb', line 18

def total
  @total
end

Class Method Details

.from_uri(uri, config) ⇒ Object



10
11
12
13
# File 'lib/ci/queue/static.rb', line 10

def from_uri(uri, config)
  tests = uri.opaque.split(':').map { |t| CGI.unescape(t) }
  new(tests, config)
end

Instance Method Details

#acknowledgeObject



147
148
149
150
# File 'lib/ci/queue/static.rb', line 147

def acknowledge(...)
  @progress += 1
  true
end

#boot_heartbeat_process!Object



74
# File 'lib/ci/queue/static.rb', line 74

def boot_heartbeat_process!; end

#buildObject



37
38
39
# File 'lib/ci/queue/static.rb', line 37

def build
  @build ||= BuildRecord.new(self)
end

#created_at=(timestamp) ⇒ Object



84
85
86
# File 'lib/ci/queue/static.rb', line 84

def created_at=(timestamp)
  @created_at ||= timestamp
end

#distributed?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/ci/queue/static.rb', line 33

def distributed?
  false
end

#ensure_heartbeat_thread_alive!Object



72
# File 'lib/ci/queue/static.rb', line 72

def ensure_heartbeat_thread_alive!; end

#exhausted?Boolean

Returns:

  • (Boolean)


143
144
145
# File 'lib/ci/queue/static.rb', line 143

def exhausted?
  @queue.empty?
end

#expired?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/ci/queue/static.rb', line 88

def expired?
  (@created_at.to_f + TEN_MINUTES) < CI::Queue.time_now.to_f
end

#increment_test_failedObject



152
153
154
# File 'lib/ci/queue/static.rb', line 152

def increment_test_failed(...)
  @test_failed = test_failed + 1
end

#lease_for(entry) ⇒ Object



68
69
70
# File 'lib/ci/queue/static.rb', line 68

def lease_for(entry)
  nil
end

#max_test_failed?Boolean

Returns:

  • (Boolean)


160
161
162
163
164
# File 'lib/ci/queue/static.rb', line 160

def max_test_failed?
  return false if config.max_test_failed.nil?

  test_failed >= config.max_test_failed
end

#pollObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ci/queue/static.rb', line 116

def poll
  while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift
    reserved_tests << reserved_test
    if entry_resolver
      resolved = entry_resolver.call(reserved_test)
      # Track the original queue entry so requeue can push it back
      # with its full payload (file path, load-error data, etc.).
      reserved_entries[resolved.id] = reserved_test if resolved.respond_to?(:id)
      yield resolved
    elsif defined?(@index) && @index
      # Queue entries may be JSON-formatted (with test_id + file_path) while
      # the index is keyed by bare test_id from populate. Try the raw entry
      # first, then fall back to extracting the test_id.
      test_id = begin
        CI::Queue::QueueEntry.test_id(reserved_test)
      rescue JSON::ParserError
        reserved_test
      end
      yield index.fetch(test_id)
    else
      yield reserved_test
    end
  end
  reserved_tests.clear
  reserved_entries.clear
end

#populate(tests, random: nil) ⇒ Object



49
50
51
52
# File 'lib/ci/queue/static.rb', line 49

def populate(tests, random: nil)
  @index = tests.map { |t| [t.id, t] }.to_h
  self
end

#populated?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/ci/queue/static.rb', line 92

def populated?
  !!defined?(@index) || @queue.any?
end

#queue_initialized?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/ci/queue/static.rb', line 80

def queue_initialized?
  true
end

#remainingObject



108
109
110
# File 'lib/ci/queue/static.rb', line 108

def remaining
  @queue.size
end

#report_worker_error(error) ⇒ Object



78
# File 'lib/ci/queue/static.rb', line 78

def report_worker_error(error); end

#requeue(entry) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
# File 'lib/ci/queue/static.rb', line 166

def requeue(entry)
  test_id = CI::Queue::QueueEntry.test_id(entry)
  return false unless should_requeue?(test_id)

  requeues[test_id] += 1
  # Push back the original queue entry (with file path / load-error payload)
  # so entry_resolver can fully resolve it on the next poll iteration.
  original_entry = reserved_entries.delete(test_id) || test_id
  @queue.unshift(original_entry)
  true
end

#retry_queueObject



45
46
47
# File 'lib/ci/queue/static.rb', line 45

def retry_queue
  self
end

#runningObject



112
113
114
# File 'lib/ci/queue/static.rb', line 112

def running
  reserved_tests.empty? ? 0 : 1
end

#shutdown!Object



29
30
31
# File 'lib/ci/queue/static.rb', line 29

def shutdown!
  @shutdown = true
end

#sizeObject



104
105
106
# File 'lib/ci/queue/static.rb', line 104

def size
  @queue.size
end

#stop_heartbeat!Object



76
# File 'lib/ci/queue/static.rb', line 76

def stop_heartbeat!; end

#stream_populate(tests, random: nil, batch_size: nil) ⇒ Object

Support lazy loading mode: accept an enumerator of entries and store them in queue order (no shuffling). This preserves the exact order from the input file for local reproduction.



57
58
59
60
61
62
# File 'lib/ci/queue/static.rb', line 57

def stream_populate(tests, random: nil, batch_size: nil)
  @queue = []
  tests.each { |entry| @queue << entry }
  @total = @queue.size
  self
end

#supervisorObject

Raises:

  • (NotImplementedError)


41
42
43
# File 'lib/ci/queue/static.rb', line 41

def supervisor
  raise NotImplementedError, "This type of queue can't be supervised"
end

#test_failedObject



156
157
158
# File 'lib/ci/queue/static.rb', line 156

def test_failed
  @test_failed ||= 0
end

#to_aObject



96
97
98
99
100
101
102
# File 'lib/ci/queue/static.rb', line 96

def to_a
  if defined?(@index) && @index
    @queue.map { |i| index.fetch(i) }
  else
    @queue.dup
  end
end

#with_heartbeat(id, lease: nil) ⇒ Object



64
65
66
# File 'lib/ci/queue/static.rb', line 64

def with_heartbeat(id, lease: nil)
  yield
end