Class: Flatware::Sink::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/flatware/sink.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobs:, formatter:, sink:, worker_count: 0) ⇒ Server



24
25
26
27
28
29
30
31
32
# File 'lib/flatware/sink.rb', line 24

def initialize(jobs:, formatter:, sink:, worker_count: 0, **)
  @checkpoints = []
  @completed_jobs = []
  @formatter = formatter
  @jobs = group_jobs(jobs, worker_count).freeze
  @queue = @jobs.dup
  @sink = sink
  @workers = Set.new(worker_count.times.to_a)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object



64
65
66
67
68
# File 'lib/flatware/sink.rb', line 64

def method_missing(name, *args)
  super unless formatter.respond_to?(name)
  Flatware.log(name, *args)
  formatter.send(name, *args)
end

Instance Attribute Details

#checkpointsObject (readonly)

Returns the value of attribute checkpoints.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def checkpoints
  @checkpoints
end

#completed_jobsObject (readonly)

Returns the value of attribute completed_jobs.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def completed_jobs
  @completed_jobs
end

#formatterObject (readonly)

Returns the value of attribute formatter.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def formatter
  @formatter
end

#jobsObject (readonly)

Returns the value of attribute jobs.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def jobs
  @jobs
end

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def queue
  @queue
end

#sinkObject (readonly)

Returns the value of attribute sink.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def sink
  @sink
end

#workersObject (readonly)

Returns the value of attribute workers.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def workers
  @workers
end

Instance Method Details

#checkpoint(checkpoint) ⇒ Object



54
55
56
# File 'lib/flatware/sink.rb', line 54

def checkpoint(checkpoint)
  checkpoints << checkpoint
end

#finished(job) ⇒ Object



58
59
60
61
62
# File 'lib/flatware/sink.rb', line 58

def finished(job)
  completed_jobs << job
  formatter.finished(job)
  check_finished!
end

#ready(worker) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/flatware/sink.rb', line 42

def ready(worker)
  job = queue.shift
  if job && !(remaining_work.empty? || interruped?)
    workers << worker
    job
  else
    workers.delete worker
    check_finished!
    Job.sentinel
  end
end

#respond_to_missing?(name, include_all) ⇒ Boolean



70
71
72
# File 'lib/flatware/sink.rb', line 70

def respond_to_missing?(name, include_all)
  formatter.respond_to?(name, include_all)
end

#startObject



34
35
36
37
38
39
40
# File 'lib/flatware/sink.rb', line 34

def start
  @signal = Signal.listen(&method(:summarize_remaining))
  formatter.jobs jobs
  DRb.start_service(sink, self, verbose: Flatware.verbose?)
  DRb.thread.join
  !failures?
end