Class: Flatware::Sink::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/flatware/sink.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobs:, formatter:, sink:, worker_count: 0) ⇒ Server



23
24
25
26
27
28
29
30
31
# File 'lib/flatware/sink.rb', line 23

def initialize(jobs:, formatter:, sink:, worker_count: 0, **)
  @checkpoints = []
  @completed_jobs = []
  @formatter = formatter
  @jobs = group_jobs(jobs, worker_count).freeze
  @queue = @jobs.dup
  @sink = sink
  @workers = Set.new(worker_count.times.to_a)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object



63
64
65
66
67
# File 'lib/flatware/sink.rb', line 63

def method_missing(name, *args)
  super unless formatter.respond_to?(name)
  Flatware.log(name, *args)
  formatter.send(name, *args)
end

Instance Attribute Details

#checkpointsObject (readonly)

Returns the value of attribute checkpoints.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def checkpoints
  @checkpoints
end

#completed_jobsObject (readonly)

Returns the value of attribute completed_jobs.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def completed_jobs
  @completed_jobs
end

#formatterObject (readonly)

Returns the value of attribute formatter.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def formatter
  @formatter
end

#jobsObject (readonly)

Returns the value of attribute jobs.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def jobs
  @jobs
end

#queueObject (readonly)

Returns the value of attribute queue.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def queue
  @queue
end

#sinkObject (readonly)

Returns the value of attribute sink.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def sink
  @sink
end

#workersObject (readonly)

Returns the value of attribute workers.



21
22
23
# File 'lib/flatware/sink.rb', line 21

def workers
  @workers
end

Instance Method Details

#checkpoint(checkpoint) ⇒ Object



53
54
55
# File 'lib/flatware/sink.rb', line 53

def checkpoint(checkpoint)
  checkpoints << checkpoint
end

#finished(job) ⇒ Object



57
58
59
60
61
# File 'lib/flatware/sink.rb', line 57

def finished(job)
  completed_jobs << job
  formatter.finished(job)
  check_finished!
end

#ready(worker) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/flatware/sink.rb', line 41

def ready(worker)
  job = queue.shift
  if job && !(remaining_work.empty? || interruped?)
    workers << worker
    job
  else
    workers.delete worker
    check_finished!
    Job.sentinel
  end
end

#respond_to_missing?(name, include_all) ⇒ Boolean



69
70
71
# File 'lib/flatware/sink.rb', line 69

def respond_to_missing?(name, include_all)
  formatter.respond_to?(name, include_all)
end

#startObject



33
34
35
36
37
38
39
# File 'lib/flatware/sink.rb', line 33

def start
  trap_interrupt
  formatter.jobs jobs
  DRb.start_service(sink, self, verbose: Flatware.verbose?)
  DRb.thread.join
  !failures?
end