Class: SBM::Coordinator
- Inherits:
-
Object
- Object
- SBM::Coordinator
- Defined in:
- lib/sbm/coordinator.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Class Method Summary collapse
Instance Method Summary collapse
- #batches ⇒ Object
- #clear(batch) ⇒ Object
- #clear_batches ⇒ Object
- #clear_workers ⇒ Object
- #complete(batch, worker) ⇒ Object
- #completed_workers_for_batch(batch) ⇒ Object
-
#initialize(name) ⇒ Coordinator
constructor
A new instance of Coordinator.
- #start(batch, worker) ⇒ Object
- #started_workers_for_batch(batch) ⇒ Object
-
#wait_for(batch, worker_count, wait_time = 15) ⇒ Object
Waits on batch to reach a count, waiting for 15 seconds at a time.
- #workers ⇒ Object
Constructor Details
#initialize(name) ⇒ Coordinator
Returns a new instance of Coordinator.
17 18 19 20 |
# File 'lib/sbm/coordinator.rb', line 17 def initialize(name) @name = name.to_s @redis = Redis.current end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
15 16 17 |
# File 'lib/sbm/coordinator.rb', line 15 def name @name end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
15 16 17 |
# File 'lib/sbm/coordinator.rb', line 15 def redis @redis end |
Class Method Details
Instance Method Details
#batches ⇒ Object
32 33 34 |
# File 'lib/sbm/coordinator.rb', line 32 def batches redis.smembers(key(:batches)).map { |w| Batch.new(w) } end |
#clear(batch) ⇒ Object
67 68 69 70 71 |
# File 'lib/sbm/coordinator.rb', line 67 def clear(batch) redis.srem key(:batches), batch.to_s redis.del key(:batches, batch, :completed) redis.del key(:batches, batch, :started) end |
#clear_batches ⇒ Object
73 74 75 76 |
# File 'lib/sbm/coordinator.rb', line 73 def clear_batches batches.each { |b| clear b } redis.del key(:batches) end |
#clear_workers ⇒ Object
78 79 80 |
# File 'lib/sbm/coordinator.rb', line 78 def clear_workers redis.del key(:workers) end |
#complete(batch, worker) ⇒ Object
54 55 56 57 |
# File 'lib/sbm/coordinator.rb', line 54 def complete(batch, worker) prepare worker, batch redis.sadd key(:batches, batch, :completed), worker.to_s end |
#completed_workers_for_batch(batch) ⇒ Object
44 45 46 |
# File 'lib/sbm/coordinator.rb', line 44 def completed_workers_for_batch(batch) redis.smembers(key(:batches, batch, :completed)).map { |w| Worker.new(w) } end |
#start(batch, worker) ⇒ Object
48 49 50 51 52 |
# File 'lib/sbm/coordinator.rb', line 48 def start(batch, worker) prepare worker, batch redis.sadd key(:batches, batch, :started), worker.to_s redis.srem key(:batches, batch, :completed), worker.to_s end |
#started_workers_for_batch(batch) ⇒ Object
40 41 42 |
# File 'lib/sbm/coordinator.rb', line 40 def started_workers_for_batch(batch) redis.smembers(key(:batches, batch, :started)).map { |w| Worker.new(w) } end |
#wait_for(batch, worker_count, wait_time = 15) ⇒ Object
Waits on batch to reach a count, waiting for 15 seconds at a time.
60 61 62 63 64 65 |
# File 'lib/sbm/coordinator.rb', line 60 def wait_for(batch, worker_count, wait_time = 15) while redis.scard(key(:batches, batch, :completed)) < worker_count sleep wait_time yield if block_given? end end |