Class: Franz::Output::Device
- Inherits:
-
Object
- Object
- Franz::Output::Device
- Defined in:
- lib/franz/output/device.rb
Overview
STDOUT output for Franz.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Device
constructor
Start a new output in the background.
-
#join ⇒ Object
Join the Output thread.
-
#stop ⇒ Object
Stop the Output thread.
Constructor Details
#initialize(opts = {}) ⇒ Device
Start a new output in the background. We’ll consume from the input queue and ship events to STDOUT.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/franz/output/device.rb', line 17 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: '/dev/stdout' }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @device = File.open(opts[:output], 'w') @logger = opts[:logger] @stop = false @foreground = opts[:foreground] @thread = Thread.new do until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end log.debug \ event: 'publish', raw: event @device.puts JSON::generate(event) @statz.inc :num_output end end log.info event: 'output started' @thread.join if @foreground end |
Instance Method Details
#join ⇒ Object
Join the Output thread. Effectively only once.
59 60 61 62 63 |
# File 'lib/franz/output/device.rb', line 59 def join return if @foreground @foreground = true @thread.join end |
#stop ⇒ Object
Stop the Output thread. Effectively only once.
66 67 68 69 70 71 72 |
# File 'lib/franz/output/device.rb', line 66 def stop return if @foreground @foreground = true @thread.kill @device.close log.info event: 'output stopped' end |