Class: Daemonic::Producer
- Inherits:
-
Object
- Object
- Daemonic::Producer
- Defined in:
- lib/daemonic/producer.rb
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#queue_size ⇒ Object
readonly
Returns the value of attribute queue_size.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#initialize(worker, options) ⇒ Producer
constructor
A new instance of Producer.
- #logger ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(worker, options) ⇒ Producer
6 7 8 9 10 11 12 13 |
# File 'lib/daemonic/producer.rb', line 6 def initialize(worker, ) @worker = worker = @concurrency = .fetch(:concurrency) { 4 } @queue_size = .fetch(:queue_size) { @concurrency + 1 } @logger = [:logger] @running = true end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
4 5 6 |
# File 'lib/daemonic/producer.rb', line 4 def concurrency @concurrency end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
4 5 6 |
# File 'lib/daemonic/producer.rb', line 4 def end |
#queue_size ⇒ Object (readonly)
Returns the value of attribute queue_size.
4 5 6 |
# File 'lib/daemonic/producer.rb', line 4 def queue_size @queue_size end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
4 5 6 |
# File 'lib/daemonic/producer.rb', line 4 def worker @worker end |
Instance Method Details
#logger ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/daemonic/producer.rb', line 53 def logger @logger ||= .fetch(:logger) { Logger.new([:log] || STDOUT).tap { |logger| logger.level = [:log_level] || Logger::INFO } } end |
#run ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/daemonic/producer.rb', line 15 def run logger.info "Starting producer with #{concurrency} consumer threads." at_exit { last_error = $! if last_error msg = "Shutting down: #{last_error.inspect}\n#{last_error.backtrace.join("\n")}" logger.fatal msg $stdout.puts msg else logger.fatal "Shutting down" $stdout.puts "Shutting down" end } Signal.trap("INT") { stop } Signal.trap("TERM") { stop } pool = Pool.new(self) producer = Thread.new do while @running worker.produce(pool) Thread.pass end logger.info { "Producer has been shut down. Stopping the thread pool" } pool.stop end producer.join end |
#stop ⇒ Object
49 50 51 |
# File 'lib/daemonic/producer.rb', line 49 def stop @running = false end |