Class: Subservient::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/subservient/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tube = 'default') ⇒ Worker

Returns a new instance of Worker.



4
5
6
7
8
# File 'lib/subservient/worker.rb', line 4

def initialize tube='default'
  @tube    = [Subservient::namespace, tube].join('.')
  @state   = :stopped
  @thread_pool = []
end

Instance Attribute Details

#stateObject (readonly)

Returns the value of attribute state.



3
4
5
# File 'lib/subservient/worker.rb', line 3

def state
  @state
end

#tubeObject (readonly)

Returns the value of attribute tube.



3
4
5
# File 'lib/subservient/worker.rb', line 3

def tube
  @tube
end

Instance Method Details

#go(threads: 5) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/subservient/worker.rb', line 10

def go threads: 5
  start threads: threads

  trap('SIGINT') do
    exit
  end
  at_exit do
    Thread.main.wakeup
    stop
  end
  sleep
end

#start(threads: 5) ⇒ Object

Raises:



22
23
24
25
26
27
28
29
# File 'lib/subservient/worker.rb', line 22

def start threads: 5
  raise MissingTaskError if Subservient::tasks.empty?
  raise AlreadyRunningError if @state != :stopped
  @state = :started
  threads.times do
    start_worker_thread
  end
end

#stop(timeout: 30) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/subservient/worker.rb', line 30

def stop timeout: 30
  @state = :shutdown        # Tell worker threads not to accept any more jobs
  @thread_pool.each do |thread|
    thread.join(timeout)    # Wait for all workers to exit gracefully...
    thread.kill             # but kill them if they take too long
  end
  @state = :stopped
end