Class: Loggability::LogDevice::Http

Inherits:
Loggability::LogDevice show all
Defined in:
lib/loggability/log_device/http.rb

Overview

This is the a generalized class that allows its subclasses to send log messages to HTTP endpoints asynchronously on a separate thread.

Direct Known Subclasses

Datadog

Constant Summary collapse

DEFAULT_ENDPOINT =

The default HTTP endpoint URL to send logs to

"http://localhost:12775/v1/logs"
DEFAULT_MAX_BATCH_SIZE =

The default maximum number of messages that can be sent to the server in a single payload

100
DEFAULT_MAX_MESSAGE_BYTESIZE =

The default max size in bytes for a single message.

2 * 16
DEFAULT_BATCH_INTERVAL =

The default number of seconds between batches

60
DEFAULT_WRITE_TIMEOUT =

The default number of seconds to wait for data to be written before timing out

15
DEFAULT_EXECUTOR_CLASS =

The default Executor class to use for asynchronous tasks

Concurrent::SingleThreadExecutor
DEFAULT_MAX_QUEUE_BYTESIZE =

The default for the maximum bytesize of the queue (1 GB)

( 2 ** 10 ) * ( 2 ** 10 ) * ( 2 ** 10 )
DEFAULT_OPTIONS =

The default options for new instances

{
  execution_interval: DEFAULT_BATCH_INTERVAL,
  write_timeout: DEFAULT_WRITE_TIMEOUT,
  max_batch_size: DEFAULT_MAX_BATCH_SIZE,
  max_message_bytesize: DEFAULT_MAX_MESSAGE_BYTESIZE,
  executor_class: DEFAULT_EXECUTOR_CLASS,
}

Constants inherited from Loggability::LogDevice

DEVICE_TARGET_REGEX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Loggability::LogDevice

create, load_device_type, parse_device_spec

Constructor Details

#initialize(endpoint = DEFAULT_ENDPOINT, opts = {}) ⇒ Http

Initialize the HTTP log device to send to the specified endpoint with the given options. Valid options are:

:batch_interval

Maximum number of seconds between batches

:write_timeout

How many seconds to wait for data to be written while sending a batch

:max_batch_size

The maximum number of messages that can be in a single batch

:max_batch_bytesize

The maximum number of bytes that can be in the payload of a single batch

:max_message_bytesize

The maximum number of bytes that can be in a single message

:executor_class

The Concurrent executor class to use for asynchronous tasks.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/loggability/log_device/http.rb', line 64

def initialize( endpoint=DEFAULT_ENDPOINT, opts={} )
  if endpoint.is_a?( Hash )
    opts = endpoint
    endpoint = DEFAULT_ENDPOINT
  end

  opts = DEFAULT_OPTIONS.merge( opts )

  @endpoint             = URI( endpoint ).freeze
  @logs_queue           = Queue.new

  @logs_queue_bytesize  = 0
  @max_queue_bytesize   = opts[:max_queue_bytesize] || DEFAULT_MAX_QUEUE_BYTESIZE
  @batch_interval       = opts[:batch_interval] || DEFAULT_BATCH_INTERVAL
  @write_timeout        = opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT
  @max_batch_size       = opts[:max_batch_size] || DEFAULT_MAX_BATCH_SIZE
  @max_message_bytesize = opts[:max_message_bytesize] || DEFAULT_MAX_MESSAGE_BYTESIZE
  @executor_class       = opts[:executor_class] || DEFAULT_EXECUTOR_CLASS

  @max_batch_bytesize   = opts[:max_batch_bytesize] || @max_batch_size * @max_message_bytesize
  @last_send_time       = Concurrent.monotonic_time
end

Instance Attribute Details

#batch_intervalObject (readonly)

Number of seconds after the task completes before the task is performed again.



119
120
121
# File 'lib/loggability/log_device/http.rb', line 119

def batch_interval
  @batch_interval
end

#endpointObject (readonly)

The URI of the endpoint to send messages to



98
99
100
# File 'lib/loggability/log_device/http.rb', line 98

def endpoint
  @endpoint
end

#executorObject (readonly)

The single thread pool executor



94
95
96
# File 'lib/loggability/log_device/http.rb', line 94

def executor
  @executor
end

#executor_classObject (readonly)

The Concurrent executor class to use for asynchronous tasks



139
140
141
# File 'lib/loggability/log_device/http.rb', line 139

def executor_class
  @executor_class
end

#last_send_timeObject

The monotonic clock time when the last batch of logs were sent



115
116
117
# File 'lib/loggability/log_device/http.rb', line 115

def last_send_time
  @last_send_time
end

#logs_queueObject (readonly)

The Queue that contains any log messages which have not yet been sent to the logging service.



103
104
105
# File 'lib/loggability/log_device/http.rb', line 103

def logs_queue
  @logs_queue
end

#logs_queue_bytesizeObject

The size of logs_queue in bytes



111
112
113
# File 'lib/loggability/log_device/http.rb', line 111

def logs_queue_bytesize
  @logs_queue_bytesize
end

#max_batch_bytesizeObject (readonly)

The maximum number of bytes that will be included in a single POST



135
136
137
# File 'lib/loggability/log_device/http.rb', line 135

def max_batch_bytesize
  @max_batch_bytesize
end

#max_batch_sizeObject (readonly)

The maximum number of messages to post at one time



127
128
129
# File 'lib/loggability/log_device/http.rb', line 127

def max_batch_size
  @max_batch_size
end

#max_message_bytesizeObject (readonly)

The maximum number of bytes of a single message to include in a batch



131
132
133
# File 'lib/loggability/log_device/http.rb', line 131

def max_message_bytesize
  @max_message_bytesize
end

#max_queue_bytesizeObject (readonly)

The max bytesize of the queue. Will not queue more messages if this threshold is hit



107
108
109
# File 'lib/loggability/log_device/http.rb', line 107

def max_queue_bytesize
  @max_queue_bytesize
end

#timer_taskObject (readonly)

The timer task thread



143
144
145
# File 'lib/loggability/log_device/http.rb', line 143

def timer_task
  @timer_task
end

#write_timeoutObject (readonly)

How many seconds to wait for data to be written while sending a batch



123
124
125
# File 'lib/loggability/log_device/http.rb', line 123

def write_timeout
  @write_timeout
end

Instance Method Details

#batch_ready?Boolean Also known as: has_batch_ready?

Returns true if a batch of logs is ready to be sent.

Returns:

  • (Boolean)


237
238
239
240
241
242
# File 'lib/loggability/log_device/http.rb', line 237

def batch_ready?
  seconds_since_last_send = Concurrent.monotonic_time - self.last_send_time

  return self.logs_queue.size >= self.max_batch_size ||
    seconds_since_last_send >= self.batch_interval
end

#closeObject

LogDevice API – stop the batch thread and close the http connection



162
163
164
165
166
167
# File 'lib/loggability/log_device/http.rb', line 162

def close
  self.stop
  self.http_client.finish
rescue IOError
  # ignore it since http session has not yet started.
end

#format_log_message(message) ⇒ Object

Returns the given message in whatever format individual log messages are expected to be in by the service. The default just returns the stringified message. This executes inside the sending thread.



286
287
288
# File 'lib/loggability/log_device/http.rb', line 286

def format_log_message( message )
  return message.to_s[ 0 ... self.max_message_bytesize ].dump
end

#get_next_log_payloadObject

Dequeue pending log messages to send to the service and return them as a suitably-encoded String. The default is a JSON Array. This executes inside the sending thread.



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/loggability/log_device/http.rb', line 260

def get_next_log_payload
  buf = []
  count = 0
  bytes = 0

  # Be conservative so as not to overflow
  max_size = self.max_batch_bytesize - self.max_message_bytesize - 2 # for the outer Array

  while count < self.max_batch_size && bytes < max_size && !self.logs_queue.empty?
    message = self.logs_queue.deq
    formatted_message = self.format_log_message( message )
    self.logs_queue_bytesize -= message.bytesize

    count += 1
    bytes += formatted_message.bytesize + 3 # comma and delimiters

    buf << formatted_message
  end

  return '[' + buf.join(',') + ']'
end

#http_clientObject

sets up a configured http object ready to instantiate connections



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/loggability/log_device/http.rb', line 292

def http_client
  return @http_client ||= begin
    uri = URI( self.endpoint )

    http = Net::HTTP.new( uri.host, uri.port )
    http.write_timeout = self.write_timeout

    if uri.scheme == 'https'
      http.use_ssl = true
      http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    end

    http
  end
end

#make_batch_requestObject

Returns a new HTTP request (a subclass of Net::HTTPRequest) suitable for sending the next batch of logs to the service. Defaults to a POST of JSON data. This executes inside the sending thread.



249
250
251
252
253
254
# File 'lib/loggability/log_device/http.rb', line 249

def make_batch_request
  request = Net::HTTP::Post.new( self.endpoint.path )
  request[ 'Content-Type' ] = 'application/json'

  return request
end

#running?Boolean

Returns true if the device has started sending messages to the logging endpoint.

Returns:

  • (Boolean)


178
179
180
# File 'lib/loggability/log_device/http.rb', line 178

def running?
  return self.executor&.running?
end

#send_logsObject

Sends a batch of log messages to the logging service. This executes inside the sending thread.



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/loggability/log_device/http.rb', line 215

def send_logs
  self.executor.post do
    if self.batch_ready?
      # p "Batch ready; sending."
      request = self.make_batch_request
      request.body = self.get_next_log_payload

      # p "Sending request", request

      self.http_client.request( request ) do |res|
        p( res ) if $DEBUG
      end

      self.last_send_time = Concurrent.monotonic_time
    else
      # p "Batch not ready yet."
    end
  end
end

#startObject

Starts a thread pool with a single thread.



171
172
173
174
# File 'lib/loggability/log_device/http.rb', line 171

def start
  self.start_executor
  self.start_timer_task
end

#start_executorObject

Start the background thread that sends messages.



199
200
201
202
# File 'lib/loggability/log_device/http.rb', line 199

def start_executor
  @executor = self.executor_class.new
  @executor.auto_terminate = true unless @executor.serialized?
end

#start_timer_taskObject

Create a timer task that calls that sends logs at regular interval



206
207
208
209
210
# File 'lib/loggability/log_device/http.rb', line 206

def start_timer_task
  @timer_task = Concurrent::TimerTask.execute( execution_interval: self.batch_interval ) do
    self.send_logs
  end
end

#stopObject

Shutdown the executor, which is a pool of single thread waits 3 seconds for shutdown to complete



185
186
187
188
189
190
191
192
193
194
195
# File 'lib/loggability/log_device/http.rb', line 185

def stop
  return unless self.running?

  self.timer_task.shutdown if self.timer_task&.running?
  self.executor.shutdown

  unless self.executor.wait_for_termination( 3 )
    self.executor.halt
    self.executor.wait_for_termination( 3 )
  end
end

#write(message) ⇒ Object

LogDevice API – write a message to the HTTP device.



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/loggability/log_device/http.rb', line 147

def write( message )
  self.start unless self.running?
  if message.is_a?( Hash )
    message_size = message.to_json.bytesize
  else
    message_size = message.bytesize
  end
  return if ( self.logs_queue_bytesize + message_size ) >= self.max_queue_bytesize
  self.logs_queue_bytesize += message_size
  self.logs_queue.enq( message )
  self.send_logs
end