Class: SemanticLogger::Appender::Async

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/semantic_logger/appender/async.rb

Overview

Allow any appender to run asynchronously in a separate thread.

Direct Known Subclasses

AsyncBatch, Processor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30) ⇒ Async

Appender proxy to allow an existing appender to run asynchronously in a separate thread.

Parameters:

max_queue_size: [Integer]
  The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
  -1: The queue size is uncapped and will never block no matter how long the queue is.
  Default: 10,000

lag_threshold_s [Float]
  Log a warning when a log message has been on the queue for longer than this period in seconds.
  Default: 30

lag_check_interval: [Integer]
  Number of messages to process before checking for slow logging.
  Default: 1,000


39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/semantic_logger/appender/async.rb', line 39

def initialize(appender:,
               max_queue_size: 10_000,
               lag_check_interval: 1_000,
               lag_threshold_s: 30)

  @appender           = appender
  @lag_check_interval = lag_check_interval
  @lag_threshold_s    = lag_threshold_s
  @thread             = nil
  @max_queue_size     = max_queue_size
  create_queue
  thread
end

Instance Attribute Details

#appenderObject (readonly)

Returns the value of attribute appender.



10
11
12
# File 'lib/semantic_logger/appender/async.rb', line 10

def appender
  @appender
end

#lag_check_intervalObject

Returns the value of attribute lag_check_interval.



9
10
11
# File 'lib/semantic_logger/appender/async.rb', line 9

def lag_check_interval
  @lag_check_interval
end

#lag_threshold_sObject

Returns the value of attribute lag_threshold_s.



9
10
11
# File 'lib/semantic_logger/appender/async.rb', line 9

def lag_threshold_s
  @lag_threshold_s
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



10
11
12
# File 'lib/semantic_logger/appender/async.rb', line 10

def max_queue_size
  @max_queue_size
end

#queueObject (readonly)

Returns the value of attribute queue.



10
11
12
# File 'lib/semantic_logger/appender/async.rb', line 10

def queue
  @queue
end

Instance Method Details

#active?Boolean

Returns true if the worker thread is active

Returns:

  • (Boolean)


81
82
83
# File 'lib/semantic_logger/appender/async.rb', line 81

def active?
  @thread&.alive?
end

#capped?Boolean

Returns [true|false] if the queue has a capped size.

Returns:

  • (Boolean)


67
68
69
# File 'lib/semantic_logger/appender/async.rb', line 67

def capped?
  @capped
end

#closeObject

Close all appenders and flush any outstanding messages.



97
98
99
100
# File 'lib/semantic_logger/appender/async.rb', line 97

def close
  # TODO: Prevent new close requests once this appender has been closed.
  submit_request(:close)
end

#flushObject

Flush all queued log entries disk, database, etc.

All queued log messages are written and then each appender is flushed in turn.


92
93
94
# File 'lib/semantic_logger/appender/async.rb', line 92

def flush
  submit_request(:flush)
end

#log(log) ⇒ Object

Add log message for processing.



86
87
88
# File 'lib/semantic_logger/appender/async.rb', line 86

def log(log)
  queue << log
end

#reopenObject

Re-open appender after a fork



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/semantic_logger/appender/async.rb', line 54

def reopen
  # Workaround CRuby crash on fork by recreating queue on reopen
  #   https://github.com/reidmorrison/semantic_logger/issues/103
  @queue&.close
  create_queue

  appender.reopen if appender.respond_to?(:reopen)

  @thread&.kill if @thread&.alive?
  @thread = Thread.new { process }
end

#threadObject

Returns [Thread] the worker thread.

Starts the worker thread if not running.



74
75
76
77
78
# File 'lib/semantic_logger/appender/async.rb', line 74

def thread
  return @thread if @thread&.alive?

  @thread = Thread.new { process }
end