Class: LogstashWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash_writer.rb

Overview

Write messages to a logstash server.

Flings events, represented as JSON objects, to logstash using the json_lines codec (over TCP). Doesn't do any munging or modification of the event data given to it, other than adding @timestamp and _id fields if they do not already exist.

We support highly-available logstash installations by means of multiple address records, or via SRV records. See the docs for .new for details as to the valid formats for the server.

Constant Summary collapse

INITIAL_RETRY_WAIT =

How long, in seconds, to pause the first time an error is encountered. Each successive error will cause a longer wait, so as to prevent thundering herds.

0.5

Instance Method Summary collapse

Constructor Details

#initialize(server_name:, logger: Logger.new("/dev/null"), backlog: 1_000, metrics_registry: Prometheus::Client::Registry.new, metrics_prefix: :logstash_writer) ⇒ LogstashWriter

Create a new logstash writer.

Once the object is created, you're ready to give it messages by calling #send_event. No messages will actually be delivered to logstash, though, until you call #run.

If multiple addresses are returned from an A/AAAA resolution, or multiple SRV records, then the records will all be tried in random order (for A/AAAA records) or in line with the standard rules for weight and priority (for SRV records).

Parameters:

  • server_name (String)

    details for connecting to the logstash server(s). This can be:

    • <IPv4 address>:<port> -- a literal IPv4 address, and mandatory port.

    • [<IPv6 address>]:<port> -- a literal IPv6 address, and mandatory port. enclosing the address in square brackets isn't required, but it's a serving suggestion to make it a little easier to discern address from port. Forgetting the include the port will end in confusion.

    • <hostname>:<port> -- the given hostname will be resolved for A/AAAA records, and all returned addresses will be tried in random order until one is found that accepts a connection.

    • <dnsname> -- the given dnsname will be resolved for SRV records, and the returned target hostnames and ports will be tried in the RFC2782-approved manner according to priority and weight until one is found which accepts a connection.

  • logger (Logger) (defaults to: Logger.new("/dev/null"))

    something to which we can write log entries for debugging and error-reporting purposes.

  • backlog (Integer) (defaults to: 1_000)

    a non-negative integer specifying the maximum number of events that should be queued during periods when the logstash server is unavailable. If the limit is exceeded, the oldest (= first event to be queued) will be dropped.

  • metrics_registry (Prometheus::Client::Registry) (defaults to: Prometheus::Client::Registry.new)

    where to register the metrics instrumenting the operation of the writer instance.

  • metrics_prefix (#to_s) (defaults to: :logstash_writer)

    what to prefix all of the metrics used to instrument the operation of the writer instance. If you instantiate multiple LogstashWriter instances with the same stats_registry, this parameter must be different for each of them, or you will get some inscrutable exception raised from the registry.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/logstash_writer.rb', line 73

def initialize(server_name:, logger: Logger.new("/dev/null"), backlog: 1_000, metrics_registry: Prometheus::Client::Registry.new, metrics_prefix: :logstash_writer)
  @server_name, @logger, @backlog = server_name, logger, backlog

  @metrics = {
    received:            metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"),
    sent:                metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"),
    queue_size:          metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"),
    dropped:             metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"),

    lag:                 metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"),

    connected:           metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"),
    connect_exception:   metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"),
    write_exception:     metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"),

    write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", "The number of exceptions that have occurred in the writing loop"),
    write_loop_ok:        metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"),
  }

  @metrics[:lag].set({}, 0)
  @metrics[:queue_size].set({}, 0)

  metrics_registry.gauge(:"#{metrics_prefix}_queue_max", "The maximum size of the event queue").set({}, backlog)

  @queue = []
  @queue_mutex = Mutex.new
  @queue_cv    = ConditionVariable.new

  @socket_mutex = Mutex.new
  @worker_mutex = Mutex.new
end

Instance Method Details

#force_disconnect!NilClass

Disconnect from the currently-active server.

In certain circumstances, you may wish to force the writer to stop sending messages to the currently-connected logstash server, and re-resolve the server_name to get new a new address to talk to. Calling this method will cause that to happen.

Returns:

  • (NilClass)


202
203
204
205
206
207
208
209
210
211
212
# File 'lib/logstash_writer.rb', line 202

def force_disconnect!
  @socket_mutex.synchronize do
    return if @current_target.nil?

    @logger.info("LogstashWriter") { "Forced disconnect from #{@current_target.describe_peer}" }
    @current_target.close
    @current_target = nil
  end

  nil
end

#runNilClass

Start sending events.

This method will return almost immediately, and actual event transmission will commence in a separate thread.

Returns:

  • (NilClass)


152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash_writer.rb', line 152

def run
  @worker_mutex.synchronize do
    if @worker_thread.nil?
      m, cv = Mutex.new, ConditionVariable.new

      @worker_thread = Thread.new { cv.signal; write_loop }

      # Don't return until the thread has *actually* started
      m.synchronize { cv.wait(m) }
    end
  end

  nil
end

#send_event(e) ⇒ NilClass

Add an event to the queue, to be sent to logstash. Actual event delivery will happen in a worker thread that is started with

run. If the event does not have a @timestamp or _id element, they

will be added with appropriate values.

Parameters:

  • e (Hash)

    the event data to be sent.

Returns:

  • (NilClass)


114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/logstash_writer.rb', line 114

def send_event(e)
  unless e.is_a?(Hash)
    raise ArgumentError, "Event must be a hash"
  end

  unless e.has_key?(:@timestamp) || e.has_key?("@timestamp")
    e[:@timestamp] = Time.now.utc.strftime("%FT%T.%NZ")
  end

  unless e.has_key?(:_id) || e.has_key?("_id")
    # This is the quickest way I've found to get a long, random string.
    # We don't need any sort of cryptographic or unpredictability
    # guarantees for what we're doing here, so SecureRandom is unnecessary
    # overhead.
    e[:_id] = rand(0x1000_0000_0000_0000_0000_0000_0000_0000).to_s(36)
  end

  @queue_mutex.synchronize do
    @queue << { content: e, arrival_timestamp: Time.now }
    while @queue.length > @backlog
      @queue.shift
      stat_dropped
    end
    @queue_cv.signal

    stat_received
  end

  nil
end

#stopNilClass

Stop the worker thread.

Politely ask the worker thread to please finish up once it's finished sending all messages that have been queued. This will return once the worker thread has finished.

Returns:

  • (NilClass)


175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/logstash_writer.rb', line 175

def stop
  @worker_mutex.synchronize do
    if @worker_thread
      @terminate = true
      @queue_cv.signal
      begin
        @worker_thread.join
      rescue Exception => ex
        @logger.error("LogstashWriter") { (["Worker thread terminated with exception: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n  ") }
      end
      @worker_thread = nil
      @socket_mutex.synchronize { (@current_target.close; @current_target = nil) if @current_target }
    end
  end

  nil
end