Class: ZSpec::Queue
Instance Attribute Summary collapse
-
#counter_name ⇒ Object
readonly
Returns the value of attribute counter_name.
-
#done_queue_name ⇒ Object
readonly
Returns the value of attribute done_queue_name.
-
#metadata_hash_name ⇒ Object
readonly
Returns the value of attribute metadata_hash_name.
-
#pending_queue_name ⇒ Object
readonly
Returns the value of attribute pending_queue_name.
-
#processing_queue_name ⇒ Object
readonly
Returns the value of attribute processing_queue_name.
-
#workers_ready_key_name ⇒ Object
readonly
Returns the value of attribute workers_ready_key_name.
Instance Method Summary collapse
- #cleanup(expire_seconds = EXPIRE_SECONDS) ⇒ Object
- #done_queue ⇒ Object
- #enqueue(messages) ⇒ Object
-
#initialize(sink:, build_prefix:, retries:, timeout:) ⇒ Queue
constructor
A new instance of Queue.
- #pending_queue ⇒ Object
- #resolve(failed, message, results, stdout) ⇒ Object
Methods included from Util
#dedupe_key, #results_key, #retry_key, #stdout_key, #timeout_key
Constructor Details
#initialize(sink:, build_prefix:, retries:, timeout:) ⇒ Queue
Returns a new instance of Queue.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/zspec/queue.rb', line 10 def initialize(sink:, build_prefix:, retries:, timeout:) @sink = sink @retries = retries.to_i @timeout = timeout.to_i @counter_name = build_prefix + ":count" @pending_queue_name = build_prefix + ":pending" @processing_queue_name = build_prefix + ":processing" @done_queue_name = build_prefix + ":done" @metadata_hash_name = build_prefix + ":metadata" @workers_ready_key_name = build_prefix + ":ready" end |
Instance Attribute Details
#counter_name ⇒ Object (readonly)
Returns the value of attribute counter_name.
5 6 7 |
# File 'lib/zspec/queue.rb', line 5 def counter_name @counter_name end |
#done_queue_name ⇒ Object (readonly)
Returns the value of attribute done_queue_name.
5 6 7 |
# File 'lib/zspec/queue.rb', line 5 def done_queue_name @done_queue_name end |
#metadata_hash_name ⇒ Object (readonly)
Returns the value of attribute metadata_hash_name.
5 6 7 |
# File 'lib/zspec/queue.rb', line 5 def @metadata_hash_name end |
#pending_queue_name ⇒ Object (readonly)
Returns the value of attribute pending_queue_name.
5 6 7 |
# File 'lib/zspec/queue.rb', line 5 def pending_queue_name @pending_queue_name end |
#processing_queue_name ⇒ Object (readonly)
Returns the value of attribute processing_queue_name.
5 6 7 |
# File 'lib/zspec/queue.rb', line 5 def processing_queue_name @processing_queue_name end |
#workers_ready_key_name ⇒ Object (readonly)
Returns the value of attribute workers_ready_key_name.
5 6 7 |
# File 'lib/zspec/queue.rb', line 5 def workers_ready_key_name @workers_ready_key_name end |
Instance Method Details
#cleanup(expire_seconds = EXPIRE_SECONDS) ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/zspec/queue.rb', line 22 def cleanup(expire_seconds = EXPIRE_SECONDS) @sink.expire(@counter_name, expire_seconds) @sink.expire(@pending_queue_name, expire_seconds) @sink.expire(@processing_queue_name, expire_seconds) @sink.expire(@done_queue_name, expire_seconds) @sink.expire(@metadata_hash_name, expire_seconds) @sink.expire(@workers_ready_key_name, expire_seconds) end |
#done_queue ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/zspec/queue.rb', line 39 def done_queue Enumerator.new do |yielder| until workers_ready? && complete? expire_processing _list, = @sink.brpop(@done_queue_name, timeout: 1) if .nil? yielder << [nil, nil] next end if @sink.hget(@metadata_hash_name, dedupe_key()) yielder << [nil, nil] next end results = @sink.hget(@metadata_hash_name, results_key()) if results.nil? yielder << [nil, nil] next end stdout = @sink.hget(@metadata_hash_name, stdout_key()) @sink.hset(@metadata_hash_name, dedupe_key(), true) @sink.decr(@counter_name) yielder << [results, stdout] end end end |
#enqueue(messages) ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/zspec/queue.rb', line 31 def enqueue() .each do || @sink.lpush(@pending_queue_name, ) @sink.incr(@counter_name) end @sink.set(@workers_ready_key_name, true) end |
#pending_queue ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/zspec/queue.rb', line 71 def pending_queue Enumerator.new do |yielder| until workers_ready? && complete? = @sink.brpoplpush(@pending_queue_name, @processing_queue_name, timeout: 1) if .nil? yielder << nil next end @sink.hset(@metadata_hash_name, timeout_key(), @sink.time.first) yielder << end end end |
#resolve(failed, message, results, stdout) ⇒ Object
85 86 87 88 89 90 91 |
# File 'lib/zspec/queue.rb', line 85 def resolve(failed, , results, stdout) if failed && (count = retry_count()) && (count < @retries) (, count) else (, results, stdout) end end |