Class: LogStashLogger::Device::Kafka

Inherits:
Connectable show all
Includes:
Stud::Buffer
Defined in:
lib/logstash-logger/device/kafka.rb

Constant Summary collapse

DEFAULT_HOST =
'localhost'
DEFAULT_PORT =
9092
DEFAULT_TOPIC =
'logstash'
DEFAULT_PRODUCER =
'logstash-logger'
DEFAULT_BACKOFF =
1

Instance Attribute Summary collapse

Attributes inherited from Base

#io, #sync

Instance Method Summary collapse

Methods inherited from Connectable

#connected?, #to_io

Methods inherited from Base

#to_io

Constructor Details

#initialize(opts) ⇒ Kafka

Returns a new instance of Kafka.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/logstash-logger/device/kafka.rb', line 17

def initialize(opts)
  super
  host = opts[:host] || DEFAULT_HOST
  port = opts[:port] || DEFAULT_PORT
  @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" }
  @topic = opts[:path] || DEFAULT_TOPIC
  @producer = opts[:producer] || DEFAULT_PRODUCER
  @backoff = opts[:backoff] || DEFAULT_BACKOFF

  @batch_events = opts.fetch(:batch_events, 50)
  @batch_timeout = opts.fetch(:batch_timeout, 5)

  buffer_initialize max_items: @batch_events, max_interval: @batch_timeout
end

Instance Attribute Details

#backoffObject

Returns the value of attribute backoff.



15
16
17
# File 'lib/logstash-logger/device/kafka.rb', line 15

def backoff
  @backoff
end

#hostsObject

Returns the value of attribute hosts.



15
16
17
# File 'lib/logstash-logger/device/kafka.rb', line 15

def hosts
  @hosts
end

#producerObject

Returns the value of attribute producer.



15
16
17
# File 'lib/logstash-logger/device/kafka.rb', line 15

def producer
  @producer
end

#topicObject

Returns the value of attribute topic.



15
16
17
# File 'lib/logstash-logger/device/kafka.rb', line 15

def topic
  @topic
end

Instance Method Details

#closeObject



59
60
61
62
63
64
65
66
# File 'lib/logstash-logger/device/kafka.rb', line 59

def close
  buffer_flush(final: true)
  @io && @io.close
rescue => e
  warn "#{self.class} - #{e.class} - #{e.message}"
ensure
  @io = nil
end

#connectObject



32
33
34
# File 'lib/logstash-logger/device/kafka.rb', line 32

def connect
  @io = ::Poseidon::Producer.new(@hosts, @producer)
end

#flush(*args) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/logstash-logger/device/kafka.rb', line 68

def flush(*args)
  if args.empty?
    buffer_flush
  else
    messages = *args.first
    with_connection do
      @io.send_messages messages
    end
  end
end

#reconnectObject



36
37
38
39
# File 'lib/logstash-logger/device/kafka.rb', line 36

def reconnect
  @io.close
  connect
end

#with_connectionObject



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/logstash-logger/device/kafka.rb', line 41

def with_connection
  connect unless @io
  yield
rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e
  warn "#{self.class} - #{e.class} -> reconnect/retry"
  sleep backoff if backoff
  reconnect
  retry
rescue => e
  warn "#{self.class} - #{e.class} - #{e.message} -> giving up"
  @io = nil
end

#write(message) ⇒ Object



54
55
56
57
# File 'lib/logstash-logger/device/kafka.rb', line 54

def write(message)
  buffer_receive Poseidon::MessageToSend.new(@topic, message)
  buffer_flush(force: true) if @sync
end