Class: Flatware::Sink::Server
- Inherits:
-
Object
- Object
- Flatware::Sink::Server
- Defined in:
- lib/flatware/sink.rb
Instance Attribute Summary collapse
-
#checkpoints ⇒ Object
readonly
Returns the value of attribute checkpoints.
-
#dispatch ⇒ Object
readonly
Returns the value of attribute dispatch.
-
#formatter ⇒ Object
readonly
Returns the value of attribute formatter.
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#poller ⇒ Object
readonly
Returns the value of attribute poller.
-
#sink ⇒ Object
readonly
Returns the value of attribute sink.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(jobs:, formatter:, dispatch:, sink:, worker_count: 0) ⇒ Server
constructor
A new instance of Server.
- #listen ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(jobs:, formatter:, dispatch:, sink:, worker_count: 0) ⇒ Server
Returns a new instance of Server.
13 14 15 16 17 18 19 20 21 |
# File 'lib/flatware/sink.rb', line 13 def initialize(jobs:, formatter:, dispatch:, sink:, worker_count: 0) @formatter = formatter @jobs = group_jobs(jobs, worker_count) @sink = Flatware.socket(ZMQ::PULL, bind: sink) @dispatch = Flatware.socket(ZMQ::REP, bind: dispatch) @poller = Poller.new(@sink, @dispatch) @workers = Set.new(worker_count.times.to_a) @checkpoints = [] end |
Instance Attribute Details
#checkpoints ⇒ Object (readonly)
Returns the value of attribute checkpoints.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def checkpoints @checkpoints end |
#dispatch ⇒ Object (readonly)
Returns the value of attribute dispatch.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def dispatch @dispatch end |
#formatter ⇒ Object (readonly)
Returns the value of attribute formatter.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def formatter @formatter end |
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def jobs @jobs end |
#poller ⇒ Object (readonly)
Returns the value of attribute poller.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def poller @poller end |
#sink ⇒ Object (readonly)
Returns the value of attribute sink.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def sink @sink end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
11 12 13 |
# File 'lib/flatware/sink.rb', line 11 def workers @workers end |
Instance Method Details
#listen ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/flatware/sink.rb', line 40 def listen que = jobs.dup poller.each do |socket| , content = socket.recv case when :ready workers << content job = que.shift if job and not done? dispatch.send job else workers.delete content dispatch.send 'seppuku' end when :checkpoint checkpoints << content when :finished completed_jobs << content formatter.finished content else formatter.send , content end break if workers.empty? and done? end formatter.summarize(checkpoints) !failures? end |
#start ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/flatware/sink.rb', line 23 def start trap 'INT' do puts "Interrupted!" formatter.summarize checkpoints summarize_remaining puts "\n\nCleaning up. Please wait...\n" Flatware.close! Process.waitall puts "thanks." exit 1 end formatter.jobs jobs listen.tap do Flatware.close end end |