Class: Flatware::Sink::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/flatware/sink.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobs:, formatter:, sink:, worker_count: 0) ⇒ Server

Returns a new instance of Server.



24
25
26
27
28
29
30
31
32
33
# File 'lib/flatware/sink.rb', line 24

def initialize(jobs:, formatter:, sink:, worker_count: 0, **)
  @checkpoints = []
  @completed_jobs = []
  @formatter = formatter
  @interrupted = false
  @jobs = group_jobs(jobs, worker_count).freeze
  @queue = @jobs.dup
  @sink = sink
  @workers = Set.new(worker_count.times.to_a)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object



65
66
67
68
69
# File 'lib/flatware/sink.rb', line 65

def method_missing(name, *args)
  super unless formatter.respond_to?(name)
  Flatware.log(name, *args)
  formatter.send(name, *args)
end

Instance Attribute Details

#checkpointsObject (readonly)

Returns the value of attribute checkpoints.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def checkpoints
  @checkpoints
end

#completed_jobsObject (readonly)

Returns the value of attribute completed_jobs.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def completed_jobs
  @completed_jobs
end

#formatterObject (readonly)

Returns the value of attribute formatter.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def formatter
  @formatter
end

#jobsObject (readonly)

Returns the value of attribute jobs.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def jobs
  @jobs
end

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def queue
  @queue
end

#sinkObject (readonly)

Returns the value of attribute sink.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def sink
  @sink
end

#workersObject (readonly)

Returns the value of attribute workers.



22
23
24
# File 'lib/flatware/sink.rb', line 22

def workers
  @workers
end

Instance Method Details

#checkpoint(checkpoint) ⇒ Object



55
56
57
# File 'lib/flatware/sink.rb', line 55

def checkpoint(checkpoint)
  checkpoints << checkpoint
end

#finished(job) ⇒ Object



59
60
61
62
63
# File 'lib/flatware/sink.rb', line 59

def finished(job)
  completed_jobs << job
  formatter.finished(job)
  check_finished!
end

#ready(worker) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/flatware/sink.rb', line 43

def ready(worker)
  job = queue.shift
  if job && !(remaining_work.empty? || interrupted?)
    workers << worker
    job
  else
    workers.delete worker
    check_finished!
    Job.sentinel
  end
end

#respond_to_missing?(name, include_all) ⇒ Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/flatware/sink.rb', line 71

def respond_to_missing?(name, include_all)
  formatter.respond_to?(name, include_all)
end

#startObject



35
36
37
38
39
40
41
# File 'lib/flatware/sink.rb', line 35

def start
  Signal.listen(formatter, &method(:on_interrupt))
  formatter.jobs jobs
  DRb.start_service(sink, self, verbose: Flatware.verbose?)
  DRb.thread.join
  !(failures? || interrupted?)
end