Class: SBM::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/sbm/coordinator.rb

Defined Under Namespace

Classes: Batch, Worker

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Coordinator

Returns a new instance of Coordinator.



17
18
19
20
# File 'lib/sbm/coordinator.rb', line 17

def initialize(name)
  @name = name.to_s
  @redis = Redis.current
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



15
16
17
# File 'lib/sbm/coordinator.rb', line 15

def name
  @name
end

#redisObject (readonly)

Returns the value of attribute redis.



15
16
17
# File 'lib/sbm/coordinator.rb', line 15

def redis
  @redis
end

Class Method Details

.defaultsObject



9
10
11
12
13
# File 'lib/sbm/coordinator.rb', line 9

def self.defaults
  worker_name      = (ENV['SBM_WORKER'] or raise "Please ensure SBM_WORKER is set")
  coordinator_name = (ENV['SBM_COORDINATOR'] || "worker-coordinator")
  return new(coordinator_name), Worker.new(worker_name)
end

Instance Method Details

#batchesObject



32
33
34
# File 'lib/sbm/coordinator.rb', line 32

def batches
  redis.smembers(key(:batches)).map { |w| Batch.new(w) }
end

#clear(batch) ⇒ Object



67
68
69
70
71
# File 'lib/sbm/coordinator.rb', line 67

def clear(batch)
  redis.srem key(:batches), batch.to_s
  redis.del key(:batches, batch, :completed)
  redis.del key(:batches, batch, :started)
end

#clear_batchesObject



73
74
75
76
# File 'lib/sbm/coordinator.rb', line 73

def clear_batches
  batches.each { |b| clear b }
  redis.del key(:batches)
end

#clear_workersObject



78
79
80
# File 'lib/sbm/coordinator.rb', line 78

def clear_workers
  redis.del key(:workers)
end

#complete(batch, worker) ⇒ Object



54
55
56
57
# File 'lib/sbm/coordinator.rb', line 54

def complete(batch, worker)
  prepare worker, batch
  redis.sadd key(:batches, batch, :completed), worker.to_s
end

#completed_workers_for_batch(batch) ⇒ Object



44
45
46
# File 'lib/sbm/coordinator.rb', line 44

def completed_workers_for_batch(batch)
  redis.smembers(key(:batches, batch, :completed)).map { |w| Worker.new(w) }
end

#start(batch, worker) ⇒ Object



48
49
50
51
52
# File 'lib/sbm/coordinator.rb', line 48

def start(batch, worker)
  prepare worker, batch
  redis.sadd key(:batches, batch, :started),   worker.to_s
  redis.srem key(:batches, batch, :completed), worker.to_s
end

#started_workers_for_batch(batch) ⇒ Object



40
41
42
# File 'lib/sbm/coordinator.rb', line 40

def started_workers_for_batch(batch)
  redis.smembers(key(:batches, batch, :started)).map { |w| Worker.new(w) }
end

#wait_for(batch, worker_count, wait_time = 15) ⇒ Object

Waits on batch to reach a count, waiting for 15 seconds at a time.



60
61
62
63
64
65
# File 'lib/sbm/coordinator.rb', line 60

def wait_for(batch, worker_count, wait_time = 15)
  while redis.scard(key(:batches, batch, :completed)) < worker_count
    sleep wait_time
    yield if block_given?
  end
end

#workersObject



36
37
38
# File 'lib/sbm/coordinator.rb', line 36

def workers
  redis.smembers(key(:workers)).map { |w| Worker.new(w) }
end