Class: LogStashLogger::Device::Kafka
- Inherits:
-
Connectable
- Object
- Base
- Connectable
- LogStashLogger::Device::Kafka
- Defined in:
- lib/logstash-logger/device/kafka.rb
Constant Summary collapse
- DEFAULT_HOST =
'localhost'
- DEFAULT_PORT =
9092
- DEFAULT_TOPIC =
'logstash'
- DEFAULT_PRODUCER =
'logstash-logger'
- DEFAULT_BACKOFF =
1
Instance Attribute Summary collapse
-
#backoff ⇒ Object
Returns the value of attribute backoff.
-
#hosts ⇒ Object
Returns the value of attribute hosts.
-
#producer ⇒ Object
Returns the value of attribute producer.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes inherited from Base
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #flush(*args) ⇒ Object
-
#initialize(opts) ⇒ Kafka
constructor
A new instance of Kafka.
- #reconnect ⇒ Object
- #with_connection ⇒ Object
- #write(message) ⇒ Object
- #write_batch(messages) ⇒ Object
Methods inherited from Connectable
Methods inherited from Base
Constructor Details
#initialize(opts) ⇒ Kafka
Returns a new instance of Kafka.
15 16 17 18 19 20 21 22 23 |
# File 'lib/logstash-logger/device/kafka.rb', line 15 def initialize(opts) super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" } @topic = opts[:path] || DEFAULT_TOPIC @producer = opts[:producer] || DEFAULT_PRODUCER @backoff = opts[:backoff] || DEFAULT_BACKOFF end |
Instance Attribute Details
#backoff ⇒ Object
Returns the value of attribute backoff.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def backoff @backoff end |
#hosts ⇒ Object
Returns the value of attribute hosts.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def hosts @hosts end |
#producer ⇒ Object
Returns the value of attribute producer.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def producer @producer end |
#topic ⇒ Object
Returns the value of attribute topic.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def topic @topic end |
Instance Method Details
#close ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/logstash-logger/device/kafka.rb', line 58 def close buffer_flush(final: true) @io && @io.close rescue => e warn "#{self.class} - #{e.class} - #{e.}" ensure @io = nil end |
#connect ⇒ Object
25 26 27 |
# File 'lib/logstash-logger/device/kafka.rb', line 25 def connect @io = ::Poseidon::Producer.new(@hosts, @producer) end |
#flush(*args) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/logstash-logger/device/kafka.rb', line 67 def flush(*args) if args.empty? buffer_flush else = *args.first write_batch() end end |
#reconnect ⇒ Object
29 30 31 32 |
# File 'lib/logstash-logger/device/kafka.rb', line 29 def reconnect @io.close connect end |
#with_connection ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/logstash-logger/device/kafka.rb', line 34 def with_connection connect unless @io yield rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e warn "#{self.class} - #{e.class} -> reconnect/retry" sleep backoff if backoff reconnect retry rescue => e warn "#{self.class} - #{e.class} - #{e.} -> giving up" @io = nil end |
#write(message) ⇒ Object
47 48 49 50 |
# File 'lib/logstash-logger/device/kafka.rb', line 47 def write() buffer_receive Poseidon::MessageToSend.new(@topic, ) buffer_flush(force: true) if @sync end |
#write_batch(messages) ⇒ Object
52 53 54 55 56 |
# File 'lib/logstash-logger/device/kafka.rb', line 52 def write_batch() with_connection do @io. end end |