Class: Realm::SNS::Gateway::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/realm/sns/gateway/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_map, event_factory:, event_processing_attempts: 3, logger: nil) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
12
13
# File 'lib/realm/sns/gateway/worker.rb', line 7

def initialize(queue_map, event_factory:, event_processing_attempts: 3, logger: nil)
  @queue_map = queue_map
  @event_factory = event_factory
  @event_processing_attempts = event_processing_attempts
  @logger = logger || Logger.new($stdout)
  @threads = []
end

Instance Method Details

#join(timeout = nil) ⇒ Object



31
32
33
# File 'lib/realm/sns/gateway/worker.rb', line 31

def join(timeout = nil)
  @threads.each { |thread| thread.join(timeout) }
end

#start(poller_options: {}) ⇒ Object



15
16
17
18
19
20
21
# File 'lib/realm/sns/gateway/worker.rb', line 15

def start(poller_options: {})
  @signaler = { exiting: false }
  @queue_map.each_pair do |queue, listener|
    @threads << Thread.new { run_poller(queue, listener, @signaler, poller_options) }
  end
  self
end

#stop(timeout: 30) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/realm/sns/gateway/worker.rb', line 23

def stop(timeout: 30)
  Thread.new { @logger.info("Stopping worker (timeout: #{timeout}s)") }.join # Cannot log from trap context
  @signaler[:exiting] = true
  join(timeout)
  @threads.clear
  self
end