Class: SemanticLogger::Appender::Kafka
- Inherits:
-
Subscriber
- Object
- Base
- Subscriber
- SemanticLogger::Appender::Kafka
- Defined in:
- lib/semantic_logger/appender/kafka.rb
Instance Attribute Summary collapse
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connect_timeout ⇒ Object
Returns the value of attribute connect_timeout.
-
#delivery_interval ⇒ Object
Returns the value of attribute delivery_interval.
-
#delivery_threshold ⇒ Object
Returns the value of attribute delivery_threshold.
-
#key ⇒ Object
Returns the value of attribute key.
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#partition_key ⇒ Object
Returns the value of attribute partition_key.
-
#required_acks ⇒ Object
Returns the value of attribute required_acks.
-
#seed_brokers ⇒ Object
Returns the value of attribute seed_brokers.
-
#socket_timeout ⇒ Object
Returns the value of attribute socket_timeout.
-
#ssl_ca_cert ⇒ Object
Returns the value of attribute ssl_ca_cert.
-
#ssl_ca_certs_from_system ⇒ Object
Returns the value of attribute ssl_ca_certs_from_system.
-
#ssl_client_cert ⇒ Object
Returns the value of attribute ssl_client_cert.
-
#ssl_client_cert_key ⇒ Object
Returns the value of attribute ssl_client_cert_key.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes inherited from Subscriber
#application, #environment, #formatter, #host, #logger, #metrics
Attributes inherited from Base
Instance Method Summary collapse
- #close ⇒ Object
-
#default_formatter ⇒ Object
Use JSON Formatter by default.
-
#flush ⇒ Object
Restart producer thread since there is no other way to flush.
-
#initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, required_acks: 1, metrics: true, **args, &block) ⇒ Kafka
constructor
Send log messages to Kafka in JSON format.
-
#log(log) ⇒ Object
Forward log messages to Kafka producer thread.
- #reopen ⇒ Object
Methods inherited from Subscriber
#console_output?, #level, #should_log?
Methods inherited from Base
#backtrace, #fast_tag, #level, #level=, #measure, #named_tags, #pop_tags, #push_tags, #should_log?, #silence, #tagged, #tags
Constructor Details
#initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, required_acks: 1, metrics: true, **args, &block) ⇒ Kafka
Send log messages to Kafka in JSON format.
Kafka Parameters:
seed_brokers: [Array<String>, String]
The list of brokers used to initialize the client. Either an Array of connections,
or a comma separated string of connections.
Connections can either be a string of "port:protocol" or a full URI with a scheme.
If there's a scheme it's ignored and only host/port are used.
client_id: [String]
The identifier for this application.
Default: semantic-logger
topic: [String]
Topic to publish log messages to.
Default: 'log_messages'
partition: [Integer]
The partition that the message should be written to.
Default: nil
partition_key: [String]
The key that should be used to assign a partition.
Default: nil
key: [String]
The message key.
Default: nil
connect_timeout: [Integer]
The timeout setting for connecting to brokers.
Default: nil
socket_timeout: [Integer]
The timeout setting for socket connections.
Default: nil
ssl_ca_cert: [String, Array<String>]
A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
Default: nil
ssl_client_cert: [String]
A PEM encoded client cert to use with a SSL connection.
Must be used in combination with ssl_client_cert_key.
Default: nil
ssl_client_cert_key [String]
A PEM encoded client cert key to use with a SSL connection.
Must be used in combination with ssl_client_cert.
Default: nil
ssl_ca_certs_from_system: [boolean]
Delegate SSL CA cert to the system certs
delivery_threshold: [Integer]
Number of messages between triggering a delivery of messages to Apache Kafka.
Default: 100
delivery_interval: [Integer]
Number of seconds between triggering a delivery of messages to Apache Kafka.
Default: 5
required_acks: [Integer]
Number of replicas that must acknowledge receipt of each log message to the topic
Default: 1
Semantic Logger Parameters:
level: [:trace | :debug | :info | :warn | :error | :fatal]
Override the log level for this appender.
Default: SemanticLogger.default_level
formatter: [Object|Proc|Symbol|Hash]
An instance of a class that implements #call, or a Proc to be used to format
the output from this appender
Default: :raw_json (See: #call)
filter: [Regexp|Proc]
RegExp: Only include log messages where the class name matches the supplied.
regular expression. All other messages will be ignored.
Proc: Only include log messages where the supplied Proc returns true
The Proc must return true or false.
host: [String]
Name of this host to appear in log messages.
Default: SemanticLogger.host
application: [String]
Name of this application to appear in log messages.
Default: SemanticLogger.application
metrics: [Boolean]
Send metrics only events to kafka.
Default: true
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/semantic_logger/appender/kafka.rb', line 125 def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, required_acks: 1, metrics: true, **args, &block) @seed_brokers = seed_brokers @client_id = client_id @connect_timeout = connect_timeout @socket_timeout = socket_timeout @ssl_ca_cert = ssl_ca_cert @ssl_client_cert = ssl_client_cert @ssl_client_cert_key = ssl_client_cert_key @ssl_ca_certs_from_system = ssl_ca_certs_from_system @topic = topic @partition = partition @partition_key = partition_key @key = key @delivery_threshold = delivery_threshold @delivery_interval = delivery_interval @required_acks = required_acks super(metrics: metrics, **args, &block) reopen end |
Instance Attribute Details
#client_id ⇒ Object
Returns the value of attribute client_id.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def client_id @client_id end |
#connect_timeout ⇒ Object
Returns the value of attribute connect_timeout.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def connect_timeout @connect_timeout end |
#delivery_interval ⇒ Object
Returns the value of attribute delivery_interval.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def delivery_interval @delivery_interval end |
#delivery_threshold ⇒ Object
Returns the value of attribute delivery_threshold.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def delivery_threshold @delivery_threshold end |
#key ⇒ Object
Returns the value of attribute key.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def key @key end |
#partition ⇒ Object
Returns the value of attribute partition.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def partition @partition end |
#partition_key ⇒ Object
Returns the value of attribute partition_key.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def partition_key @partition_key end |
#required_acks ⇒ Object
Returns the value of attribute required_acks.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def required_acks @required_acks end |
#seed_brokers ⇒ Object
Returns the value of attribute seed_brokers.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def seed_brokers @seed_brokers end |
#socket_timeout ⇒ Object
Returns the value of attribute socket_timeout.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def socket_timeout @socket_timeout end |
#ssl_ca_cert ⇒ Object
Returns the value of attribute ssl_ca_cert.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def ssl_ca_cert @ssl_ca_cert end |
#ssl_ca_certs_from_system ⇒ Object
Returns the value of attribute ssl_ca_certs_from_system.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def ssl_ca_certs_from_system @ssl_ca_certs_from_system end |
#ssl_client_cert ⇒ Object
Returns the value of attribute ssl_client_cert.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def ssl_client_cert @ssl_client_cert end |
#ssl_client_cert_key ⇒ Object
Returns the value of attribute ssl_client_cert_key.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def ssl_client_cert_key @ssl_client_cert_key end |
#topic ⇒ Object
Returns the value of attribute topic.
25 26 27 |
# File 'lib/semantic_logger/appender/kafka.rb', line 25 def topic @topic end |
Instance Method Details
#close ⇒ Object
171 172 173 174 175 176 |
# File 'lib/semantic_logger/appender/kafka.rb', line 171 def close @producer&.shutdown @producer = nil @kafka&.close @kafka = nil end |
#default_formatter ⇒ Object
Use JSON Formatter by default.
185 186 187 |
# File 'lib/semantic_logger/appender/kafka.rb', line 185 def default_formatter SemanticLogger::Formatters::Json.new end |
#flush ⇒ Object
Restart producer thread since there is no other way to flush.
190 191 192 193 194 195 196 |
# File 'lib/semantic_logger/appender/kafka.rb', line 190 def flush @producer.shutdown @producer = @kafka.async_producer( delivery_threshold: delivery_threshold, delivery_interval: delivery_interval ) end |
#log(log) ⇒ Object
Forward log messages to Kafka producer thread.
179 180 181 182 |
# File 'lib/semantic_logger/appender/kafka.rb', line 179 def log(log) json = formatter.call(log, self) @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key) end |
#reopen ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/semantic_logger/appender/kafka.rb', line 151 def reopen @kafka = ::Kafka.new( seed_brokers: seed_brokers, client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, ssl_ca_cert: ssl_ca_cert, ssl_client_cert: ssl_client_cert, ssl_client_cert_key: ssl_client_cert_key, ssl_ca_certs_from_system: ssl_ca_certs_from_system, logger: logger ) @producer = @kafka.async_producer( delivery_threshold: delivery_threshold, delivery_interval: delivery_interval, required_acks: required_acks ) end |