Class: Franz::Output::Kafka
- Inherits:
-
Object
- Object
- Franz::Output::Kafka
- Defined in:
- lib/franz/output/kafka.rb
Overview
Kafka output for Franz.
Constant Summary collapse
- @@host =
We’ll apply the hostname to all events
Socket.gethostname
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Kafka
constructor
Start a new output in the background.
-
#join ⇒ Object
Join the Output thread.
-
#stop ⇒ Object
Stop the Output thread.
Constructor Details
#initialize(opts = {}) ⇒ Kafka
Start a new output in the background. We’ll consume from the input queue and ship events to STDOUT.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/franz/output/kafka.rb', line 22 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { topic: 'franz', flush_interval: 10, flush_size: 500, client_id: @@host, type: 'sync', compression_codec: 'snappy', metadata_refresh_interval_ms: 600000, max_send_retries: 3, retry_backoff_ms: 100, required_acks: 0, ack_timeout_ms: 1500, socket_timeout_ms: 10000 } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] @stop = false @foreground = opts[:foreground] @flush_size = opts[:output].delete :flush_size @flush_interval = opts[:output].delete :flush_interval @topic = opts[:output].delete :topic kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ] kafka_client_id = opts[:output].delete :client_id kafka_config = opts[:output].map { |k,v| [ k, v.is_a?(String) ? v.to_sym : v ] } @kafka = Poseidon::Producer.new \ kafka_brokers, kafka_client_id, Hash[kafka_config] @lock = Mutex.new = [] @thread = Thread.new do loop do @lock.synchronize do = = [] @kafka. unless .empty? log.debug \ event: 'periodic flush', num_messages: .size end sleep @flush_interval end end @thread = Thread.new do until @stop event = opts[:input].shift log.trace \ event: 'publish', raw: event payload = JSON::generate(event) @lock.synchronize do << Poseidon::MessageToSend.new(@topic, payload) @statz.inc :num_output if .size >= @flush_size @kafka. log.debug \ event: 'flush', num_messages: .size = [] end end end end log.info event: 'output started' @thread.join if @foreground end |
Instance Method Details
#join ⇒ Object
Join the Output thread. Effectively only once.
120 121 122 123 124 |
# File 'lib/franz/output/kafka.rb', line 120 def join return if @foreground @foreground = true @thread.join end |
#stop ⇒ Object
Stop the Output thread. Effectively only once.
128 129 130 131 132 133 |
# File 'lib/franz/output/kafka.rb', line 128 def stop return if @foreground @foreground = true @thread.kill log.info event: 'output stopped' end |