Class: Eventboss::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging, SafeThread
Defined in:
lib/eventboss/worker.rb

Overview

Worker is part of a pool of workers, handles UnitOfWork lifecycle

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SafeThread

#handle_exception, #safe_thread

Methods included from Logging

#logger

Constructor Details

#initialize(launcher, id, bus, restart_on: [Exception]) ⇒ Worker

Returns a new instance of Worker.



9
10
11
12
13
14
15
# File 'lib/eventboss/worker.rb', line 9

def initialize(launcher, id, bus, restart_on: [Exception])
  @id = "worker-#{id}"
  @launcher = launcher
  @bus = bus
  @thread = nil
  @restart_on = restart_on
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/eventboss/worker.rb', line 7

def id
  @id
end

Instance Method Details

#kill(wait = false) ⇒ Object



41
42
43
44
45
46
# File 'lib/eventboss/worker.rb', line 41

def kill(wait = false)
  stop_token
  return unless @thread
  @thread.raise Eventboss::Shutdown
  @thread.value if wait
end

#runObject



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/eventboss/worker.rb', line 21

def run
  while (work = @bus.pop)
    work.run
  end
  @launcher.worker_stopped(self)
rescue Eventboss::Shutdown
  @launcher.worker_stopped(self)
rescue *@restart_on => exception
  handle_exception(exception, worker_id: id)
  # Restart the worker in case of hard exception
  # Message won't be delete from SQS and will be visible later
  @launcher.worker_stopped(self, restart: true)
end

#startObject



17
18
19
# File 'lib/eventboss/worker.rb', line 17

def start
  @thread = safe_thread(id, &method(:run))
end

#terminate(wait = false) ⇒ Object



35
36
37
38
39
# File 'lib/eventboss/worker.rb', line 35

def terminate(wait = false)
  stop_token
  return unless @thread
  @thread.value if wait
end