Class: Sourced::Worker
- Inherits:
-
Object
- Object
- Sourced::Worker
- Defined in:
- lib/sourced/worker.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #dispatch_next_command ⇒ Object
-
#drain ⇒ Object
Drain all reactors And all scheduled commands.
-
#initialize(logger: Sourced.config.logger, name: SecureRandom.hex(4), poll_interval: 0.01) ⇒ Worker
constructor
A new instance of Worker.
- #next_reactor ⇒ Object
- #poll ⇒ Object
- #stop ⇒ Object
- #tick(reactor = next_reactor) ⇒ Object
Constructor Details
#initialize(logger: Sourced.config.logger, name: SecureRandom.hex(4), poll_interval: 0.01) ⇒ Worker
Returns a new instance of Worker.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/sourced/worker.rb', line 18 def initialize( logger: Sourced.config.logger, name: SecureRandom.hex(4), poll_interval: 0.01 ) @logger = logger @running = false @name = [Process.pid, name].join('-') @poll_interval = poll_interval # TODO: If reactors have a :weight, we can use that # to populate this array according to the weight # so that some reactors are picked more often than others @reactors = Router.async_reactors.filter do |r| r.handled_events.any? end.to_a.shuffle @reactor_index = 0 end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
16 17 18 |
# File 'lib/sourced/worker.rb', line 16 def name @name end |
Class Method Details
.drain ⇒ Object
8 9 10 |
# File 'lib/sourced/worker.rb', line 8 def self.drain new.drain end |
.tick ⇒ Object
12 13 14 |
# File 'lib/sourced/worker.rb', line 12 def self.tick new.tick end |
Instance Method Details
#dispatch_next_command ⇒ Object
78 79 80 |
# File 'lib/sourced/worker.rb', line 78 def dispatch_next_command Router.dispatch_next_command end |
#drain ⇒ Object
Drain all reactors And all scheduled commands
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/sourced/worker.rb', line 60 def drain @reactors.each do |reactor| loop do event = tick(reactor) break unless event end end loop do cmd = dispatch_next_command break unless cmd end end |
#next_reactor ⇒ Object
82 83 84 85 86 87 |
# File 'lib/sourced/worker.rb', line 82 def next_reactor @reactor_index = 0 if @reactor_index >= @reactors.size reactor = @reactors[@reactor_index] @reactor_index += 1 reactor end |
#poll ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/sourced/worker.rb', line 40 def poll if @reactors.empty? logger.warn "Worker #{name}: No reactors to poll" return false end @running = true while @running tick # This sleep seems to be necessary or workers in differet processes will not be able to get the lock sleep @poll_interval dispatch_next_command.tap do |c| sleep @poll_interval if c end end logger.info "Worker #{name}: Polling stopped" end |
#stop ⇒ Object
36 37 38 |
# File 'lib/sourced/worker.rb', line 36 def stop @running = false end |
#tick(reactor = next_reactor) ⇒ Object
74 75 76 |
# File 'lib/sourced/worker.rb', line 74 def tick(reactor = next_reactor) Router.handle_next_event_for_reactor(reactor, name) end |