Class: Fleiss::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/fleiss/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues: [Fleiss::DEFAULT_QUEUE], concurrency: 10, wait_time: 1) ⇒ Worker

Init a new worker instance

Parameters:

  • [Array<String>] (Hash)

    a customizable set of options

  • [Integer] (Hash)

    a customizable set of options

  • [Numeric] (Hash)

    a customizable set of options



17
18
19
20
21
22
# File 'lib/fleiss/worker.rb', line 17

def initialize(queues: [Fleiss::DEFAULT_QUEUE], concurrency: 10, wait_time: 1)
  @uuid      = SecureRandom.uuid
  @queues    = Array(queues)
  @pool      = Fleiss::Executor.new(max_size: concurrency)
  @wait_time = wait_time
end

Instance Attribute Details

#queuesObject (readonly)

Returns the value of attribute queues.



6
7
8
# File 'lib/fleiss/worker.rb', line 6

def queues
  @queues
end

#uuidObject (readonly)

Returns the value of attribute uuid.



6
7
8
# File 'lib/fleiss/worker.rb', line 6

def uuid
  @uuid
end

#wait_timeObject (readonly)

Returns the value of attribute wait_time.



6
7
8
# File 'lib/fleiss/worker.rb', line 6

def wait_time
  @wait_time
end

Class Method Details

.run(**opts) ⇒ Object

Shortcut for new(**opts).run



9
10
11
# File 'lib/fleiss/worker.rb', line 9

def self.run(**opts)
  new(**opts).run
end

Instance Method Details

#runObject

Run starts the worker



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fleiss/worker.rb', line 25

def run
  log(:info) { "Worker #{uuid} starting - queues: #{queues.inspect}, concurrency: #{@pool.max_size}" }
  loop do
    run_cycle
    sleep @wait_time
  end
rescue SignalException => e
  log(:info) { "Worker #{uuid} received #{e.message}. Shutting down..." }
ensure
  @pool.shutdown
  @pool.wait_for_termination
end