Class: Franz::Output::RabbitMQ
- Inherits:
-
Object
- Object
- Franz::Output::RabbitMQ
- Defined in:
- lib/franz/output/rabbitmq.rb
Overview
RabbitMQ output for Franz. You must declare an x-consistent-hash type exchange, as we generate random Integers for routing keys.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ RabbitMQ
constructor
Start a new output in the background.
-
#join ⇒ Object
Join the Output thread.
-
#stop ⇒ Object
Stop the Output thread.
Constructor Details
#initialize(opts = {}) ⇒ RabbitMQ
Start a new output in the background. We’ll consume from the input queue and ship events to the configured RabbitMQ cluster.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/franz/output/rabbitmq.rb', line 20 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { exchange: { name: 'test', durable: true }, connection: { host: 'localhost', port: 5672 } } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] rabbit = Bunny.new opts[:output][:connection].merge({ network_recovery_interval: 10.0, continuation_timeout: 10_000, threaded: false, logger: @logger }) rabbit.start channel = rabbit.create_channel exchange = opts[:output][:exchange].delete(:name) exchange = channel.exchange exchange, \ { type: 'x-consistent-hash' }.merge(opts[:output][:exchange]) @stop = false @foreground = opts[:foreground] @thread = Thread.new do rand = Random.new until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end log.debug \ event: 'publish', raw: event exchange.publish \ JSON::generate(event), routing_key: rand.rand(10_000), persistent: false @statz.inc :num_output end end log.info event: 'output started' @thread.join if @foreground end |
Instance Method Details
#join ⇒ Object
Join the Output thread. Effectively only once.
88 89 90 91 92 |
# File 'lib/franz/output/rabbitmq.rb', line 88 def join return if @foreground @foreground = true @thread.join end |
#stop ⇒ Object
Stop the Output thread. Effectively only once.
95 96 97 98 99 100 |
# File 'lib/franz/output/rabbitmq.rb', line 95 def stop return if @foreground @foreground = true @thread.kill log.info event: 'output stopped' end |