Class: Franz::Output
- Inherits:
-
Object
- Object
- Franz::Output
- Defined in:
- lib/franz/output.rb
Overview
RabbitMQ output for Franz. You must declare an x-consistent-hash type exchange, as we generate random Integers for routing keys.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Output
constructor
Start a new output in the background.
-
#join ⇒ Object
Join the Output thread.
-
#stop ⇒ Object
Stop the Output thread.
Constructor Details
#initialize(opts = {}) ⇒ Output
Start a new output in the background. We’ll consume from the input queue and ship events to the configured RabbitMQ cluster.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/franz/output.rb', line 18 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { exchange: { name: 'test', durable: true }, connection: { host: 'localhost', port: 5672 } } }.deep_merge!(opts) @logger = opts[:logger] rabbit = Bunny.new opts[:output][:connection].merge({ network_recovery_interval: 10.0, continuation_timeout: 10_000, threaded: false }) rabbit.start channel = rabbit.create_channel exchange = opts[:output][:exchange].delete(:name) exchange = channel.exchange exchange, \ { type: 'x-consistent-hash' }.merge(opts[:output][:exchange]) @stop = false @foreground = opts[:foreground] @thread = Thread.new do rand = Random.new until @stop event = opts[:input].shift event[:path] = event[:path].sub('/home/denimuser/seam-builds/rel', '') event[:path] = event[:path].sub('/home/denimuser/seam-builds/live', '') event[:path] = event[:path].sub('/home/denimuser/seam-builds/beta', '') event[:path] = event[:path].sub('/home/denimuser/builds/rel', '') event[:path] = event[:path].sub('/home/denimuser/builds/live', '') event[:path] = event[:path].sub('/home/denimuser/builds/beta', '') event[:path] = event[:path].sub('/home/denimuser/cobalt-builds/rel', '') event[:path] = event[:path].sub('/home/denimuser/cobalt-builds/live', '') event[:path] = event[:path].sub('/home/denimuser/cobalt-builds/beta', '') event[:path] = event[:path].sub('/home/denimuser/rivet-builds', '') event[:path] = event[:path].sub('/home/denimuser/denim/logs', '') event[:path] = event[:path].sub('/home/denimuser/seam/logs', '') event[:path] = event[:path].sub('/home/denimuser/rivet/bjn/logs', '') event[:path] = event[:path].sub('/home/denimuser', '') event[:path] = event[:path].sub('/var/log', '') log.trace \ event: 'publish', raw: event exchange.publish \ JSON::generate(event), routing_key: rand.rand(10_000), persistent: false end end log.debug \ event: 'output started', foreground: @foreground, opts: opts @thread.join if @foreground end |
Instance Method Details
#join ⇒ Object
Join the Output thread. Effectively only once.
94 95 96 97 98 |
# File 'lib/franz/output.rb', line 94 def join return if @foreground @foreground = true @thread.join end |
#stop ⇒ Object
Stop the Output thread. Effectively only once.
101 102 103 104 105 106 |
# File 'lib/franz/output.rb', line 101 def stop return if @foreground @foreground = true @thread.kill log.debug event: 'output stopped' end |