Class: LogStashLogger::Device::Connectable

Inherits:
Base
  • Object
show all
Includes:
Buffer
Defined in:
lib/logstash-logger/device/connectable.rb

Direct Known Subclasses

Kafka, Redis, Socket, Unix

Instance Attribute Summary

Attributes inherited from Base

#error_logger, #io, #sync

Instance Method Summary collapse

Methods included from Buffer

#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer

Methods inherited from Base

#close!, #unrecoverable_error?

Constructor Details

#initialize(opts = {}) ⇒ Connectable

Returns a new instance of Connectable.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/logstash-logger/device/connectable.rb', line 8

def initialize(opts = {})
  super

  if opts[:batch_events]
    warn "The :batch_events option is deprecated. Please use :buffer_max_items instead"
  end

  if opts[:batch_timeout]
    warn "The :batch_timeout option is deprecated. Please use :buffer_max_interval instead"
  end

  @buffer_group = nil
  @buffer_max_items = opts[:batch_events] || opts[:buffer_max_items]
  @buffer_max_interval = opts[:batch_timeout] || opts[:buffer_max_interval]
  @drop_messages_on_flush_error =
    if opts.key?(:drop_messages_on_flush_error)
      opts.delete(:drop_messages_on_flush_error)
    else
      false
    end

  @drop_messages_on_full_buffer =
    if opts.key?(:drop_messages_on_full_buffer)
      opts.delete(:drop_messages_on_full_buffer)
    else
      true
    end

  @buffer_flush_at_exit =
    if opts.key?(:buffer_flush_at_exit)
      opts.delete(:buffer_flush_at_exit)
    else
      true
    end

  buffer_initialize(
    max_items: @buffer_max_items,
    max_interval: @buffer_max_interval,
    autoflush: @sync,
    drop_messages_on_flush_error: @drop_messages_on_flush_error,
    drop_messages_on_full_buffer: @drop_messages_on_full_buffer,
    flush_at_exit: @buffer_flush_at_exit
  )
end

Instance Method Details

#close(opts = {}) ⇒ Object



70
71
72
73
74
75
76
# File 'lib/logstash-logger/device/connectable.rb', line 70

def close(opts = {})
  if opts.fetch(:flush, true)
    buffer_flush(final: true)
  end

  super
end

#connectObject

Implemented by subclasses



101
102
103
# File 'lib/logstash-logger/device/connectable.rb', line 101

def connect
  fail NotImplementedError
end

#connected?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/logstash-logger/device/connectable.rb', line 84

def connected?
  !!@io
end

#flush(*args) ⇒ Object



57
58
59
60
61
62
63
64
# File 'lib/logstash-logger/device/connectable.rb', line 57

def flush(*args)
  if args.empty?
    buffer_flush
  else
    messages, group = *args
    write_batch(messages, group)
  end
end

#on_full_buffer_receive(data) ⇒ Object



66
67
68
# File 'lib/logstash-logger/device/connectable.rb', line 66

def on_full_buffer_receive(data)
  log_warning("Buffer Full - #{data}")
end

#reconnectObject



105
106
107
108
# File 'lib/logstash-logger/device/connectable.rb', line 105

def reconnect
  close(flush: false)
  connect
end

#to_ioObject



78
79
80
81
82
# File 'lib/logstash-logger/device/connectable.rb', line 78

def to_io
  with_connection do
    super
  end
end

#with_connection(&block) ⇒ Object

Ensure the block is executed with a valid connection



111
112
113
114
115
116
117
118
# File 'lib/logstash-logger/device/connectable.rb', line 111

def with_connection(&block)
  connect unless connected?
  yield
rescue => e
  log_error(e)
  close(flush: false)
  raise
end

#write(message) ⇒ Object



53
54
55
# File 'lib/logstash-logger/device/connectable.rb', line 53

def write(message)
  buffer_receive message, @buffer_group
end

#write_batch(messages, group = nil) ⇒ Object



94
95
96
97
98
# File 'lib/logstash-logger/device/connectable.rb', line 94

def write_batch(messages, group = nil)
  with_connection do
    super
  end
end

#write_one(message) ⇒ Object



88
89
90
91
92
# File 'lib/logstash-logger/device/connectable.rb', line 88

def write_one(message)
  with_connection do
    super
  end
end