Class: LogStashLogger::Device::Kafka

Inherits:
Connectable show all
Defined in:
lib/logstash-logger/device/kafka.rb

Constant Summary collapse

DEFAULT_HOST =
'localhost'
DEFAULT_PORT =
9092
DEFAULT_TOPIC =
'logstash'
DEFAULT_PRODUCER =
'logstash-logger'
DEFAULT_BACKOFF =
1

Instance Attribute Summary collapse

Attributes inherited from Base

#io, #sync

Instance Method Summary collapse

Methods inherited from Connectable

#connected?, #to_io

Methods inherited from Base

#to_io

Constructor Details

#initialize(opts) ⇒ Kafka

Returns a new instance of Kafka.



15
16
17
18
19
20
21
22
23
# File 'lib/logstash-logger/device/kafka.rb', line 15

def initialize(opts)
  super
  host = opts[:host] || DEFAULT_HOST
  port = opts[:port] || DEFAULT_PORT
  @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" }
  @topic = opts[:path] || DEFAULT_TOPIC
  @producer = opts[:producer] || DEFAULT_PRODUCER
  @backoff = opts[:backoff] || DEFAULT_BACKOFF
end

Instance Attribute Details

#backoffObject

Returns the value of attribute backoff.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def backoff
  @backoff
end

#hostsObject

Returns the value of attribute hosts.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def hosts
  @hosts
end

#producerObject

Returns the value of attribute producer.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def producer
  @producer
end

#topicObject

Returns the value of attribute topic.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def topic
  @topic
end

Instance Method Details

#closeObject



58
59
60
61
62
63
64
65
# File 'lib/logstash-logger/device/kafka.rb', line 58

def close
  buffer_flush(final: true)
  @io && @io.close
rescue => e
  warn "#{self.class} - #{e.class} - #{e.message}"
ensure
  @io = nil
end

#connectObject



25
26
27
# File 'lib/logstash-logger/device/kafka.rb', line 25

def connect
  @io = ::Poseidon::Producer.new(@hosts, @producer)
end

#flush(*args) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/logstash-logger/device/kafka.rb', line 67

def flush(*args)
  if args.empty?
    buffer_flush
  else
    messages = *args.first
    write_batch(messages)
  end
end

#reconnectObject



29
30
31
32
# File 'lib/logstash-logger/device/kafka.rb', line 29

def reconnect
  @io.close
  connect
end

#with_connectionObject



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/logstash-logger/device/kafka.rb', line 34

def with_connection
  connect unless @io
  yield
rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e
  warn "#{self.class} - #{e.class} -> reconnect/retry"
  sleep backoff if backoff
  reconnect
  retry
rescue => e
  warn "#{self.class} - #{e.class} - #{e.message} -> giving up"
  @io = nil
end

#write(message) ⇒ Object



47
48
49
50
# File 'lib/logstash-logger/device/kafka.rb', line 47

def write(message)
  buffer_receive Poseidon::MessageToSend.new(@topic, message)
  buffer_flush(force: true) if @sync
end

#write_batch(messages) ⇒ Object



52
53
54
55
56
# File 'lib/logstash-logger/device/kafka.rb', line 52

def write_batch(messages)
  with_connection do
    @io.send_messages messages
  end
end