Class: ZSpec::Queue

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/zspec/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#dedupe_key, #results_key, #retry_key, #stdout_key, #timeout_key

Constructor Details

#initialize(sink:, build_prefix:, retries:, timeout:) ⇒ Queue

Returns a new instance of Queue.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/zspec/queue.rb', line 10

def initialize(sink:, build_prefix:, retries:, timeout:)
  @sink                   = sink
  @retries                = retries.to_i
  @timeout                = timeout.to_i
  @counter_name           = build_prefix + ":count"
  @pending_queue_name     = build_prefix + ":pending"
  @processing_queue_name  = build_prefix + ":processing"
  @done_queue_name        = build_prefix + ":done"
  @metadata_hash_name     = build_prefix + ":metadata"
  @workers_ready_key_name = build_prefix + ":ready"
end

Instance Attribute Details

#counter_nameObject (readonly)

Returns the value of attribute counter_name.



5
6
7
# File 'lib/zspec/queue.rb', line 5

def counter_name
  @counter_name
end

#done_queue_nameObject (readonly)

Returns the value of attribute done_queue_name.



5
6
7
# File 'lib/zspec/queue.rb', line 5

def done_queue_name
  @done_queue_name
end

#metadata_hash_nameObject (readonly)

Returns the value of attribute metadata_hash_name.



5
6
7
# File 'lib/zspec/queue.rb', line 5

def 
  @metadata_hash_name
end

#pending_queue_nameObject (readonly)

Returns the value of attribute pending_queue_name.



5
6
7
# File 'lib/zspec/queue.rb', line 5

def pending_queue_name
  @pending_queue_name
end

#processing_queue_nameObject (readonly)

Returns the value of attribute processing_queue_name.



5
6
7
# File 'lib/zspec/queue.rb', line 5

def processing_queue_name
  @processing_queue_name
end

#workers_ready_key_nameObject (readonly)

Returns the value of attribute workers_ready_key_name.



5
6
7
# File 'lib/zspec/queue.rb', line 5

def workers_ready_key_name
  @workers_ready_key_name
end

Instance Method Details

#cleanup(expire_seconds = EXPIRE_SECONDS) ⇒ Object



22
23
24
25
26
27
28
29
# File 'lib/zspec/queue.rb', line 22

def cleanup(expire_seconds = EXPIRE_SECONDS)
  @sink.expire(@counter_name, expire_seconds)
  @sink.expire(@pending_queue_name, expire_seconds)
  @sink.expire(@processing_queue_name, expire_seconds)
  @sink.expire(@done_queue_name, expire_seconds)
  @sink.expire(@metadata_hash_name, expire_seconds)
  @sink.expire(@workers_ready_key_name, expire_seconds)
end

#done_queueObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/zspec/queue.rb', line 39

def done_queue
  Enumerator.new do |yielder|
    until workers_ready? && complete?
      expire_processing

      _list, message = @sink.brpop(@done_queue_name, timeout: 1)
      if message.nil?
        yielder << [nil, nil]
        next
      end

      if @sink.hget(@metadata_hash_name, dedupe_key(message))
        yielder << [nil, nil]
        next
      end

      results = @sink.hget(@metadata_hash_name, results_key(message))
      if results.nil?
        yielder << [nil, nil]
        next
      end

      stdout = @sink.hget(@metadata_hash_name, stdout_key(message))

      @sink.hset(@metadata_hash_name, dedupe_key(message), true)
      @sink.decr(@counter_name)

      yielder << [results, stdout]
    end
  end
end

#enqueue(messages) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/zspec/queue.rb', line 31

def enqueue(messages)
  messages.each do |message|
    @sink.lpush(@pending_queue_name, message)
    @sink.incr(@counter_name)
  end
  @sink.set(@workers_ready_key_name, true)
end

#pending_queueObject



71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/zspec/queue.rb', line 71

def pending_queue
  Enumerator.new do |yielder|
    until workers_ready? && complete?
      message = @sink.brpoplpush(@pending_queue_name, @processing_queue_name, timeout: 1)
      if message.nil?
        yielder << nil
        next
      end
      @sink.hset(@metadata_hash_name, timeout_key(message), @sink.time.first)
      yielder << message
    end
  end
end

#resolve(failed, message, results, stdout) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/zspec/queue.rb', line 85

def resolve(failed, message, results, stdout)
  if failed && (count = retry_count(message)) && (count < @retries)
    retry_message(message, count)
  else
    resolve_message(message, results, stdout)
  end
end