Class: SemanticLogger::Appender::Kafka

Inherits:
Subscriber show all
Defined in:
lib/semantic_logger/appender/kafka.rb

Instance Attribute Summary collapse

Attributes inherited from Subscriber

#application, #environment, #formatter, #host, #logger, #metrics

Attributes inherited from Base

#filter, #name

Instance Method Summary collapse

Methods inherited from Subscriber

#console_output?, #level, #should_log?

Methods inherited from Base

#backtrace, #fast_tag, #level, #level=, #measure, #named_tags, #pop_tags, #push_tags, #should_log?, #silence, #tagged, #tags

Constructor Details

#initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, required_acks: 1, metrics: true, **args, &block) ⇒ Kafka

Send log messages to Kafka in JSON format.

Kafka Parameters:

seed_brokers: [Array<String>, String]
  The list of brokers used to initialize the client. Either an Array of connections,
  or a comma separated string of connections.
  Connections can either be a string of "port:protocol" or a full URI with a scheme.
  If there's a scheme it's ignored and only host/port are used.

client_id: [String]
  The identifier for this application.
  Default: semantic-logger

topic: [String]
  Topic to publish log messages to.
  Default: 'log_messages'

partition: [Integer]
  The partition that the message should be written to.
  Default: nil

partition_key: [String]
  The key that should be used to assign a partition.
  Default: nil

key: [String]
  The message key.
  Default: nil

connect_timeout: [Integer]
  The timeout setting for connecting to brokers.
  Default: nil

socket_timeout: [Integer]
  The timeout setting for socket connections.
  Default: nil

ssl_ca_cert: [String, Array<String>]
  A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
  Default: nil

ssl_client_cert: [String]
  A PEM encoded client cert to use with a SSL connection.
  Must be used in combination with ssl_client_cert_key.
  Default: nil

ssl_client_cert_key [String]
  A PEM encoded client cert key to use with a SSL connection.
  Must be used in combination with ssl_client_cert.
  Default: nil

ssl_ca_certs_from_system: [boolean]
  Delegate SSL CA cert to the system certs

 delivery_threshold: [Integer]
   Number of messages between triggering a delivery of messages to Apache Kafka.
   Default: 100

 delivery_interval: [Integer]
   Number of seconds between triggering a delivery of messages to Apache Kafka.
   Default: 5

 required_acks: [Integer]
   Number of replicas that must acknowledge receipt of each log message to the topic
   Default: 1

Semantic Logger Parameters:

level: [:trace | :debug | :info | :warn | :error | :fatal]
  Override the log level for this appender.
  Default: SemanticLogger.default_level

formatter: [Object|Proc|Symbol|Hash]
  An instance of a class that implements #call, or a Proc to be used to format
  the output from this appender
  Default: :raw_json (See: #call)

filter: [Regexp|Proc]
  RegExp: Only include log messages where the class name matches the supplied.
  regular expression. All other messages will be ignored.
  Proc: Only include log messages where the supplied Proc returns true
        The Proc must return true or false.

host: [String]
  Name of this host to appear in log messages.
  Default: SemanticLogger.host

application: [String]
  Name of this application to appear in log messages.
  Default: SemanticLogger.application

metrics: [Boolean]
  Send metrics only events to kafka.
  Default: true


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/semantic_logger/appender/kafka.rb', line 125

def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false,
               topic: "log_messages", partition: nil, partition_key: nil, key: nil,
               delivery_threshold: 100, delivery_interval: 10, required_acks: 1,
               metrics: true, **args, &block)

  @seed_brokers             = seed_brokers
  @client_id                = client_id
  @connect_timeout          = connect_timeout
  @socket_timeout           = socket_timeout
  @ssl_ca_cert              = ssl_ca_cert
  @ssl_client_cert          = ssl_client_cert
  @ssl_client_cert_key      = ssl_client_cert_key
  @ssl_ca_certs_from_system = ssl_ca_certs_from_system
  @topic                    = topic
  @partition                = partition
  @partition_key            = partition_key
  @key                      = key
  @delivery_threshold       = delivery_threshold
  @delivery_interval        = delivery_interval
  @required_acks            = required_acks

  super(metrics: metrics, **args, &block)
  reopen
end

Instance Attribute Details

#client_idObject

Returns the value of attribute client_id.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def client_id
  @client_id
end

#connect_timeoutObject

Returns the value of attribute connect_timeout.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def connect_timeout
  @connect_timeout
end

#delivery_intervalObject

Returns the value of attribute delivery_interval.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def delivery_interval
  @delivery_interval
end

#delivery_thresholdObject

Returns the value of attribute delivery_threshold.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def delivery_threshold
  @delivery_threshold
end

#keyObject

Returns the value of attribute key.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def key
  @key
end

#partitionObject

Returns the value of attribute partition.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def partition
  @partition
end

#partition_keyObject

Returns the value of attribute partition_key.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def partition_key
  @partition_key
end

#required_acksObject

Returns the value of attribute required_acks.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def required_acks
  @required_acks
end

#seed_brokersObject

Returns the value of attribute seed_brokers.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def seed_brokers
  @seed_brokers
end

#socket_timeoutObject

Returns the value of attribute socket_timeout.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def socket_timeout
  @socket_timeout
end

#ssl_ca_certObject

Returns the value of attribute ssl_ca_cert.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def ssl_ca_cert
  @ssl_ca_cert
end

#ssl_ca_certs_from_systemObject

Returns the value of attribute ssl_ca_certs_from_system.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def ssl_ca_certs_from_system
  @ssl_ca_certs_from_system
end

#ssl_client_certObject

Returns the value of attribute ssl_client_cert.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def ssl_client_cert
  @ssl_client_cert
end

#ssl_client_cert_keyObject

Returns the value of attribute ssl_client_cert_key.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def ssl_client_cert_key
  @ssl_client_cert_key
end

#topicObject

Returns the value of attribute topic.



25
26
27
# File 'lib/semantic_logger/appender/kafka.rb', line 25

def topic
  @topic
end

Instance Method Details

#closeObject



171
172
173
174
175
176
# File 'lib/semantic_logger/appender/kafka.rb', line 171

def close
  @producer&.shutdown
  @producer = nil
  @kafka&.close
  @kafka = nil
end

#default_formatterObject

Use JSON Formatter by default.



185
186
187
# File 'lib/semantic_logger/appender/kafka.rb', line 185

def default_formatter
  SemanticLogger::Formatters::Json.new
end

#flushObject

Restart producer thread since there is no other way to flush.



190
191
192
193
194
195
196
# File 'lib/semantic_logger/appender/kafka.rb', line 190

def flush
  @producer.shutdown
  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval
  )
end

#log(log) ⇒ Object

Forward log messages to Kafka producer thread.



179
180
181
182
# File 'lib/semantic_logger/appender/kafka.rb', line 179

def log(log)
  json = formatter.call(log, self)
  @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
end

#reopenObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/semantic_logger/appender/kafka.rb', line 151

def reopen
  @kafka = ::Kafka.new(
    seed_brokers:             seed_brokers,
    client_id:                client_id,
    connect_timeout:          connect_timeout,
    socket_timeout:           socket_timeout,
    ssl_ca_cert:              ssl_ca_cert,
    ssl_client_cert:          ssl_client_cert,
    ssl_client_cert_key:      ssl_client_cert_key,
    ssl_ca_certs_from_system: ssl_ca_certs_from_system,
    logger:                   logger
  )

  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval,
    required_acks:      required_acks
  )
end