Class: Datadog::CI::AsyncWriter
- Inherits:
-
Object
- Object
- Datadog::CI::AsyncWriter
- Includes:
- Core::Workers::Polling, Core::Workers::Queue
- Defined in:
- lib/datadog/ci/async_writer.rb
Constant Summary collapse
- DEFAULT_BUFFER_MAX_SIZE =
10_000
- DEFAULT_SHUTDOWN_TIMEOUT =
60
- DEFAULT_INTERVAL =
3
Instance Attribute Summary collapse
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #async? ⇒ Boolean
- #buffer_klass ⇒ Object
- #dequeue ⇒ Object
- #enqueue(event) ⇒ Object
-
#initialize(transport:, options: {}) ⇒ AsyncWriter
constructor
A new instance of AsyncWriter.
- #perform(*events) ⇒ Object
- #stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
- #work_pending? ⇒ Boolean
- #write(event) ⇒ Object
Constructor Details
#initialize(transport:, options: {}) ⇒ AsyncWriter
Returns a new instance of AsyncWriter.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/datadog/ci/async_writer.rb', line 25 def initialize(transport:, options: {}) @transport = transport # Workers::Polling settings self.enabled = .fetch(:enabled, true) # Workers::Async::Thread settings self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART # Workers::IntervalLoop settings self.loop_base_interval = [:interval] || DEFAULT_INTERVAL self.loop_back_off_ratio = [:back_off_ratio] if .key?(:back_off_ratio) self.loop_back_off_max = [:back_off_max] if .key?(:back_off_max) @buffer_size = .fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) self.buffer = buffer_klass.new(@buffer_size) @shutdown_timeout = .fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT) @stopped = false end |
Instance Attribute Details
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
18 19 20 |
# File 'lib/datadog/ci/async_writer.rb', line 18 def transport @transport end |
Instance Method Details
#after_fork ⇒ Object
96 97 98 99 100 101 |
# File 'lib/datadog/ci/async_writer.rb', line 96 def after_fork # In multiprocess environments, forks will share the same buffer until its written to. # A.K.A. copy-on-write. We don't want forks to write events generated from another process. # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.) self.buffer = buffer_klass.new(@buffer_size) end |
#async? ⇒ Boolean
92 93 94 |
# File 'lib/datadog/ci/async_writer.rb', line 92 def async? true end |
#buffer_klass ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/datadog/ci/async_writer.rb', line 103 def buffer_klass if Core::Environment::Ext::RUBY_ENGINE == "ruby" Core::Buffer::CRuby else Core::Buffer::ThreadSafe end end |
#dequeue ⇒ Object
84 85 86 |
# File 'lib/datadog/ci/async_writer.rb', line 84 def dequeue buffer.pop end |
#enqueue(event) ⇒ Object
80 81 82 |
# File 'lib/datadog/ci/async_writer.rb', line 80 def enqueue(event) buffer.push(event) end |
#perform(*events) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/datadog/ci/async_writer.rb', line 58 def perform(*events) responses = transport.send_events(events) if responses.find(&:server_error?) loop_back_off! Datadog.logger.warn { "Encountered server error while sending events: #{responses}" } end nil rescue => e Datadog.logger.warn { "Error while sending events: #{e}" } loop_back_off! end |
#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/datadog/ci/async_writer.rb', line 72 def stop(force_stop = false, timeout = @shutdown_timeout) @stopped = true buffer.close if running? super end |
#work_pending? ⇒ Boolean
88 89 90 |
# File 'lib/datadog/ci/async_writer.rb', line 88 def work_pending? !buffer.empty? end |
#write(event) ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/datadog/ci/async_writer.rb', line 48 def write(event) return if @stopped # Start worker thread. If the process has forked, it will trigger #after_fork to # reconfigure the worker accordingly. perform enqueue(event) end |