Class: Sourced::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sourced/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger: Sourced.config.logger, name: SecureRandom.hex(4), poll_interval: 0.01) ⇒ Worker

Returns a new instance of Worker.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/sourced/worker.rb', line 18

def initialize(
  logger: Sourced.config.logger,
  name: SecureRandom.hex(4),
  poll_interval: 0.01
)
  @logger = logger
  @running = false
  @name = [Process.pid, name].join('-')
  @poll_interval = poll_interval
  # TODO: If reactors have a :weight, we can use that
  # to populate this array according to the weight
  # so that some reactors are picked more often than others
  @reactors = Router.async_reactors.filter do |r|
    r.handled_events.any?
  end.to_a.shuffle
  @reactor_index = 0
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



16
17
18
# File 'lib/sourced/worker.rb', line 16

def name
  @name
end

Class Method Details

.drainObject



8
9
10
# File 'lib/sourced/worker.rb', line 8

def self.drain
  new.drain
end

.tickObject



12
13
14
# File 'lib/sourced/worker.rb', line 12

def self.tick
  new.tick
end

Instance Method Details

#dispatch_next_commandObject



78
79
80
# File 'lib/sourced/worker.rb', line 78

def dispatch_next_command
  Router.dispatch_next_command
end

#drainObject

Drain all reactors And all scheduled commands



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/sourced/worker.rb', line 60

def drain
  @reactors.each do |reactor|
    loop do
      event = tick(reactor)
      break unless event
    end
  end

  loop do
    cmd = dispatch_next_command
    break unless cmd
  end
end

#next_reactorObject



82
83
84
85
86
87
# File 'lib/sourced/worker.rb', line 82

def next_reactor
  @reactor_index = 0 if @reactor_index >= @reactors.size
  reactor = @reactors[@reactor_index]
  @reactor_index += 1
  reactor
end

#pollObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/sourced/worker.rb', line 40

def poll
  if @reactors.empty?
    logger.warn "Worker #{name}: No reactors to poll"
    return false
  end

  @running = true
  while @running
    tick
    # This sleep seems to be necessary or workers in differet processes will not be able to get the lock
    sleep @poll_interval
    dispatch_next_command.tap do |c|
      sleep @poll_interval if c
    end
  end
  logger.info "Worker #{name}: Polling stopped"
end

#stopObject



36
37
38
# File 'lib/sourced/worker.rb', line 36

def stop
  @running = false
end

#tick(reactor = next_reactor) ⇒ Object



74
75
76
# File 'lib/sourced/worker.rb', line 74

def tick(reactor = next_reactor)
  Router.handle_next_event_for_reactor(reactor, name)
end