Class: SemanticLogger::Appender::Async

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/semantic_logger/appender/async.rb

Overview

Allow any appender to run asynchronously in a separate thread.

Direct Known Subclasses

AsyncBatch, Processor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30) ⇒ Async

Appender proxy to allow an existing appender to run asynchronously in a separate thread.

Parameters:

max_queue_size: [Integer]
  The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
  -1: The queue size is uncapped and will never block no matter how long the queue is.
  Default: 10,000

lag_threshold_s [Float]
  Log a warning when a log message has been on the queue for longer than this period in seconds.
  Default: 30

lag_check_interval: [Integer]
  Number of messages to process before checking for slow logging.
  Default: 1,000


38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/semantic_logger/appender/async.rb', line 38

def initialize(appender:,
               max_queue_size: 10_000,
               lag_check_interval: 1_000,
               lag_threshold_s: 30)

  @appender           = appender
  @lag_check_interval = lag_check_interval
  @lag_threshold_s    = lag_threshold_s
  @thread             = nil
  @max_queue_size     = max_queue_size
  create_queue
  thread
end

Instance Attribute Details

#appenderObject (readonly)

Returns the value of attribute appender.



10
11
12
# File 'lib/semantic_logger/appender/async.rb', line 10

def appender
  @appender
end

#lag_check_intervalObject

Returns the value of attribute lag_check_interval.



9
10
11
# File 'lib/semantic_logger/appender/async.rb', line 9

def lag_check_interval
  @lag_check_interval
end

#lag_threshold_sObject

Returns the value of attribute lag_threshold_s.



9
10
11
# File 'lib/semantic_logger/appender/async.rb', line 9

def lag_threshold_s
  @lag_threshold_s
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



10
11
12
# File 'lib/semantic_logger/appender/async.rb', line 10

def max_queue_size
  @max_queue_size
end

#queueObject (readonly)

Returns the value of attribute queue.



10
11
12
# File 'lib/semantic_logger/appender/async.rb', line 10

def queue
  @queue
end

Instance Method Details

#active?Boolean

Returns true if the worker thread is active

Returns:

  • (Boolean)


79
80
81
# File 'lib/semantic_logger/appender/async.rb', line 79

def active?
  @thread&.alive?
end

#capped?Boolean

Returns [true|false] if the queue has a capped size.

Returns:

  • (Boolean)


66
67
68
# File 'lib/semantic_logger/appender/async.rb', line 66

def capped?
  @capped
end

#closeObject

Close all appenders and flush any outstanding messages.



95
96
97
98
# File 'lib/semantic_logger/appender/async.rb', line 95

def close
  # TODO: Prevent new close requests once this appender has been closed.
  submit_request(:close)
end

#flushObject

Flush all queued log entries disk, database, etc.

All queued log messages are written and then each appender is flushed in turn.


90
91
92
# File 'lib/semantic_logger/appender/async.rb', line 90

def flush
  submit_request(:flush)
end

#log(log) ⇒ Object

Add log message for processing.



84
85
86
# File 'lib/semantic_logger/appender/async.rb', line 84

def log(log)
  queue << log
end

#reopenObject

Re-open appender after a fork



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/semantic_logger/appender/async.rb', line 53

def reopen
  # Workaround CRuby crash on fork by recreating queue on reopen
  #   https://github.com/rocketjob/semantic_logger/issues/103
  @queue&.close
  create_queue

  appender.reopen if appender.respond_to?(:reopen)

  @thread.kill if @thread&.alive?
  @thread = Thread.new { process }
end

#threadObject

Returns [Thread] the worker thread.

Starts the worker thread if not running.



73
74
75
76
# File 'lib/semantic_logger/appender/async.rb', line 73

def thread
  return @thread if @thread&.alive?
  @thread = Thread.new { process }
end