Class: Loggability::LogDevice::Http
- Inherits:
-
Loggability::LogDevice
- Object
- Loggability::LogDevice
- Loggability::LogDevice::Http
- Defined in:
- lib/loggability/log_device/http.rb
Overview
This is the a generalized class that allows its subclasses to send log messages to HTTP endpoints asynchronously on a separate thread.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_ENDPOINT =
The default HTTP endpoint URL to send logs to
"http://localhost:12775/v1/logs"
- DEFAULT_MAX_BATCH_SIZE =
The default maximum number of messages that can be sent to the server in a single payload
100
- DEFAULT_MAX_MESSAGE_BYTESIZE =
The default max size in bytes for a single message.
2 * 16
- DEFAULT_BATCH_INTERVAL =
The default number of seconds between batches
60
- DEFAULT_WRITE_TIMEOUT =
The default number of seconds to wait for data to be written before timing out
15
- DEFAULT_EXECUTOR_CLASS =
The default Executor class to use for asynchronous tasks
Concurrent::SingleThreadExecutor
- DEFAULT_MAX_QUEUE_BYTESIZE =
The default for the maximum bytesize of the queue (1 GB)
( 2 ** 10 ) * ( 2 ** 10 ) * ( 2 ** 10 )
- DEFAULT_OPTIONS =
The default options for new instances
{ execution_interval: DEFAULT_BATCH_INTERVAL, write_timeout: DEFAULT_WRITE_TIMEOUT, max_batch_size: DEFAULT_MAX_BATCH_SIZE, max_message_bytesize: DEFAULT_MAX_MESSAGE_BYTESIZE, executor_class: DEFAULT_EXECUTOR_CLASS, }
Constants inherited from Loggability::LogDevice
Instance Attribute Summary collapse
-
#batch_interval ⇒ Object
readonly
Number of seconds after the task completes before the task is performed again.
-
#endpoint ⇒ Object
readonly
The URI of the endpoint to send messages to.
-
#executor ⇒ Object
readonly
The single thread pool executor.
-
#executor_class ⇒ Object
readonly
The Concurrent executor class to use for asynchronous tasks.
-
#last_send_time ⇒ Object
The monotonic clock time when the last batch of logs were sent.
-
#logs_queue ⇒ Object
readonly
The Queue that contains any log messages which have not yet been sent to the logging service.
-
#logs_queue_bytesize ⇒ Object
The size of
logs_queue
in bytes. -
#max_batch_bytesize ⇒ Object
readonly
The maximum number of bytes that will be included in a single POST.
-
#max_batch_size ⇒ Object
readonly
The maximum number of messages to post at one time.
-
#max_message_bytesize ⇒ Object
readonly
The maximum number of bytes of a single message to include in a batch.
-
#max_queue_bytesize ⇒ Object
readonly
The max bytesize of the queue.
-
#timer_task ⇒ Object
readonly
The timer task thread.
-
#write_timeout ⇒ Object
readonly
How many seconds to wait for data to be written while sending a batch.
Instance Method Summary collapse
-
#batch_ready? ⇒ Boolean
(also: #has_batch_ready?)
Returns
true
if a batch of logs is ready to be sent. -
#close ⇒ Object
LogDevice API – stop the batch thread and close the http connection.
-
#format_log_message(message) ⇒ Object
Returns the given
message
in whatever format individual log messages are expected to be in by the service. -
#get_next_log_payload ⇒ Object
Dequeue pending log messages to send to the service and return them as a suitably-encoded String.
-
#http_client ⇒ Object
sets up a configured http object ready to instantiate connections.
-
#initialize(endpoint = DEFAULT_ENDPOINT, opts = {}) ⇒ Http
constructor
Initialize the HTTP log device to send to the specified
endpoint
with the givenoptions
. -
#make_batch_request ⇒ Object
Returns a new HTTP request (a subclass of Net::HTTPRequest) suitable for sending the next batch of logs to the service.
-
#running? ⇒ Boolean
Returns
true
if the device has started sending messages to the logging endpoint. -
#send_logs ⇒ Object
Sends a batch of log messages to the logging service.
-
#start ⇒ Object
Starts a thread pool with a single thread.
-
#start_executor ⇒ Object
Start the background thread that sends messages.
-
#start_timer_task ⇒ Object
Create a timer task that calls that sends logs at regular interval.
-
#stop ⇒ Object
Shutdown the executor, which is a pool of single thread waits 3 seconds for shutdown to complete.
-
#write(message) ⇒ Object
LogDevice API – write a message to the HTTP device.
Methods inherited from Loggability::LogDevice
create, load_device_type, parse_device_spec
Constructor Details
#initialize(endpoint = DEFAULT_ENDPOINT, opts = {}) ⇒ Http
Initialize the HTTP log device to send to the specified endpoint
with the given options
. Valid options are:
- :batch_interval
-
Maximum number of seconds between batches
- :write_timeout
-
How many seconds to wait for data to be written while sending a batch
- :max_batch_size
-
The maximum number of messages that can be in a single batch
- :max_batch_bytesize
-
The maximum number of bytes that can be in the payload of a single batch
- :max_message_bytesize
-
The maximum number of bytes that can be in a single message
- :executor_class
-
The Concurrent executor class to use for asynchronous tasks.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/loggability/log_device/http.rb', line 64 def initialize( endpoint=DEFAULT_ENDPOINT, opts={} ) if endpoint.is_a?( Hash ) opts = endpoint endpoint = DEFAULT_ENDPOINT end opts = DEFAULT_OPTIONS.merge( opts ) @endpoint = URI( endpoint ).freeze @logs_queue = Queue.new @logs_queue_bytesize = 0 @max_queue_bytesize = opts[:max_queue_bytesize] || DEFAULT_MAX_QUEUE_BYTESIZE @batch_interval = opts[:batch_interval] || DEFAULT_BATCH_INTERVAL @write_timeout = opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT @max_batch_size = opts[:max_batch_size] || DEFAULT_MAX_BATCH_SIZE = opts[:max_message_bytesize] || DEFAULT_MAX_MESSAGE_BYTESIZE @executor_class = opts[:executor_class] || DEFAULT_EXECUTOR_CLASS @max_batch_bytesize = opts[:max_batch_bytesize] || @max_batch_size * @last_send_time = Concurrent.monotonic_time end |
Instance Attribute Details
#batch_interval ⇒ Object (readonly)
Number of seconds after the task completes before the task is performed again.
119 120 121 |
# File 'lib/loggability/log_device/http.rb', line 119 def batch_interval @batch_interval end |
#endpoint ⇒ Object (readonly)
The URI of the endpoint to send messages to
98 99 100 |
# File 'lib/loggability/log_device/http.rb', line 98 def endpoint @endpoint end |
#executor ⇒ Object (readonly)
The single thread pool executor
94 95 96 |
# File 'lib/loggability/log_device/http.rb', line 94 def executor @executor end |
#executor_class ⇒ Object (readonly)
The Concurrent executor class to use for asynchronous tasks
139 140 141 |
# File 'lib/loggability/log_device/http.rb', line 139 def executor_class @executor_class end |
#last_send_time ⇒ Object
The monotonic clock time when the last batch of logs were sent
115 116 117 |
# File 'lib/loggability/log_device/http.rb', line 115 def last_send_time @last_send_time end |
#logs_queue ⇒ Object (readonly)
The Queue that contains any log messages which have not yet been sent to the logging service.
103 104 105 |
# File 'lib/loggability/log_device/http.rb', line 103 def logs_queue @logs_queue end |
#logs_queue_bytesize ⇒ Object
The size of logs_queue
in bytes
111 112 113 |
# File 'lib/loggability/log_device/http.rb', line 111 def logs_queue_bytesize @logs_queue_bytesize end |
#max_batch_bytesize ⇒ Object (readonly)
The maximum number of bytes that will be included in a single POST
135 136 137 |
# File 'lib/loggability/log_device/http.rb', line 135 def max_batch_bytesize @max_batch_bytesize end |
#max_batch_size ⇒ Object (readonly)
The maximum number of messages to post at one time
127 128 129 |
# File 'lib/loggability/log_device/http.rb', line 127 def max_batch_size @max_batch_size end |
#max_message_bytesize ⇒ Object (readonly)
The maximum number of bytes of a single message to include in a batch
131 132 133 |
# File 'lib/loggability/log_device/http.rb', line 131 def end |
#max_queue_bytesize ⇒ Object (readonly)
The max bytesize of the queue. Will not queue more messages if this threshold is hit
107 108 109 |
# File 'lib/loggability/log_device/http.rb', line 107 def max_queue_bytesize @max_queue_bytesize end |
#timer_task ⇒ Object (readonly)
The timer task thread
143 144 145 |
# File 'lib/loggability/log_device/http.rb', line 143 def timer_task @timer_task end |
#write_timeout ⇒ Object (readonly)
How many seconds to wait for data to be written while sending a batch
123 124 125 |
# File 'lib/loggability/log_device/http.rb', line 123 def write_timeout @write_timeout end |
Instance Method Details
#batch_ready? ⇒ Boolean Also known as: has_batch_ready?
Returns true
if a batch of logs is ready to be sent.
237 238 239 240 241 242 |
# File 'lib/loggability/log_device/http.rb', line 237 def batch_ready? seconds_since_last_send = Concurrent.monotonic_time - self.last_send_time return self.logs_queue.size >= self.max_batch_size || seconds_since_last_send >= self.batch_interval end |
#close ⇒ Object
LogDevice API – stop the batch thread and close the http connection
162 163 164 165 166 167 |
# File 'lib/loggability/log_device/http.rb', line 162 def close self.stop self.http_client.finish rescue IOError # ignore it since http session has not yet started. end |
#format_log_message(message) ⇒ Object
Returns the given message
in whatever format individual log messages are expected to be in by the service. The default just returns the stringified message
. This executes inside the sending thread.
286 287 288 |
# File 'lib/loggability/log_device/http.rb', line 286 def ( ) return .to_s[ 0 ... self. ].dump end |
#get_next_log_payload ⇒ Object
Dequeue pending log messages to send to the service and return them as a suitably-encoded String. The default is a JSON Array. This executes inside the sending thread.
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/loggability/log_device/http.rb', line 260 def get_next_log_payload buf = [] count = 0 bytes = 0 # Be conservative so as not to overflow max_size = self.max_batch_bytesize - self. - 2 # for the outer Array while count < self.max_batch_size && bytes < max_size && !self.logs_queue.empty? = self.logs_queue.deq = self.( ) self.logs_queue_bytesize -= .bytesize count += 1 bytes += .bytesize + 3 # comma and delimiters buf << end return '[' + buf.join(',') + ']' end |
#http_client ⇒ Object
sets up a configured http object ready to instantiate connections
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/loggability/log_device/http.rb', line 292 def http_client return @http_client ||= begin uri = URI( self.endpoint ) http = Net::HTTP.new( uri.host, uri.port ) http.write_timeout = self.write_timeout if uri.scheme == 'https' http.use_ssl = true http.verify_mode = OpenSSL::SSL::VERIFY_PEER end http end end |
#make_batch_request ⇒ Object
Returns a new HTTP request (a subclass of Net::HTTPRequest) suitable for sending the next batch of logs to the service. Defaults to a POST of JSON data. This executes inside the sending thread.
249 250 251 252 253 254 |
# File 'lib/loggability/log_device/http.rb', line 249 def make_batch_request request = Net::HTTP::Post.new( self.endpoint.path ) request[ 'Content-Type' ] = 'application/json' return request end |
#running? ⇒ Boolean
Returns true
if the device has started sending messages to the logging endpoint.
178 179 180 |
# File 'lib/loggability/log_device/http.rb', line 178 def running? return self.executor&.running? end |
#send_logs ⇒ Object
Sends a batch of log messages to the logging service. This executes inside the sending thread.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/loggability/log_device/http.rb', line 215 def send_logs self.executor.post do if self.batch_ready? # p "Batch ready; sending." request = self.make_batch_request request.body = self.get_next_log_payload # p "Sending request", request self.http_client.request( request ) do |res| p( res ) if $DEBUG end self.last_send_time = Concurrent.monotonic_time else # p "Batch not ready yet." end end end |
#start ⇒ Object
Starts a thread pool with a single thread.
171 172 173 174 |
# File 'lib/loggability/log_device/http.rb', line 171 def start self.start_executor self.start_timer_task end |
#start_executor ⇒ Object
Start the background thread that sends messages.
199 200 201 202 |
# File 'lib/loggability/log_device/http.rb', line 199 def start_executor @executor = self.executor_class.new @executor.auto_terminate = true unless @executor.serialized? end |
#start_timer_task ⇒ Object
Create a timer task that calls that sends logs at regular interval
206 207 208 209 210 |
# File 'lib/loggability/log_device/http.rb', line 206 def start_timer_task @timer_task = Concurrent::TimerTask.execute( execution_interval: self.batch_interval ) do self.send_logs end end |
#stop ⇒ Object
Shutdown the executor, which is a pool of single thread waits 3 seconds for shutdown to complete
185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/loggability/log_device/http.rb', line 185 def stop return unless self.running? self.timer_task.shutdown if self.timer_task&.running? self.executor.shutdown unless self.executor.wait_for_termination( 3 ) self.executor.halt self.executor.wait_for_termination( 3 ) end end |
#write(message) ⇒ Object
LogDevice API – write a message to the HTTP device.
147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/loggability/log_device/http.rb', line 147 def write( ) self.start unless self.running? if .is_a?( Hash ) = .to_json.bytesize else = .bytesize end return if ( self.logs_queue_bytesize + ) >= self.max_queue_bytesize self.logs_queue_bytesize += self.logs_queue.enq( ) self.send_logs end |