Class: Datadog::Workers::AsyncTraceWriter

Inherits:
TraceWriter show all
Includes:
Polling, Queue
Defined in:
lib/ddtrace/workers/trace_writer.rb

Overview

Writes traces to transport asynchronously, using a thread & buffer.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
FORK_POLICY_ASYNC =
:async
FORK_POLICY_SYNC =
:sync

Constants included from Polling

Polling::SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes included from Queue

#buffer

Attributes inherited from TraceWriter

#transport

Attributes inherited from Datadog::Worker

#task

Instance Method Summary collapse

Methods included from Polling

#enabled=, #enabled?, included

Methods included from Queue

included

Methods inherited from TraceWriter

#flush_completed, #flush_traces, #inject_hostname!, #process_traces, #write_traces

Constructor Details

#initialize(options = {}) ⇒ AsyncTraceWriter

Returns a new instance of AsyncTraceWriter.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/ddtrace/workers/trace_writer.rb', line 107

def initialize(options = {})
  # Workers::TraceWriter settings
  super

  # Workers::Polling settings
  self.enabled = options.fetch(:enabled, true)

  # Workers::Async::Thread settings
  @async = true
  self.fork_policy = options.fetch(:fork_policy, FORK_POLICY_ASYNC)

  # Workers::IntervalLoop settings
  self.loop_base_interval = options[:interval] if options.key?(:interval)
  self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio)
  self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max)

  # Workers::Queue settings
  @buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  self.buffer = TraceBuffer.new(@buffer_size)
end

Instance Attribute Details

#async=(value) ⇒ Object (writeonly)

Sets the attribute async

Parameters:

  • value

    the value to set the attribute async to.



104
105
106
# File 'lib/ddtrace/workers/trace_writer.rb', line 104

def async=(value)
  @async = value
end

Instance Method Details

#after_forkObject



179
180
181
182
183
184
185
186
187
188
# File 'lib/ddtrace/workers/trace_writer.rb', line 179

def after_fork
  # In multiprocess environments, forks will share the same buffer until its written to.
  # A.K.A. copy-on-write. We don't want forks to write traces generated from another process.
  # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.)
  self.buffer = TraceBuffer.new(@buffer_size)

  # Switch to synchronous mode if configured to do so.
  # In some cases synchronous writing is preferred because the fork will be short lived.
  @async = false if @writer_fork_policy == FORK_POLICY_SYNC
end

#async?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/ddtrace/workers/trace_writer.rb', line 155

def async?
  @async == true
end

#dequeueObject



145
146
147
148
149
# File 'lib/ddtrace/workers/trace_writer.rb', line 145

def dequeue
  # Wrap results in Array because they are
  # splatted as args against TraceWriter#perform.
  [buffer.pop]
end

#enqueue(trace) ⇒ Object



141
142
143
# File 'lib/ddtrace/workers/trace_writer.rb', line 141

def enqueue(trace)
  buffer.push(trace)
end

#fork_policy=(policy) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/ddtrace/workers/trace_writer.rb', line 159

def fork_policy=(policy)
  # Translate to Workers::Async::Thread policy
  thread_fork_policy = case policy
                       when Workers::Async::Thread::FORK_POLICY_STOP
                         policy
                       when FORK_POLICY_SYNC
                         # Stop the async thread because the writer
                         # will bypass and run synchronously.
                         Workers::Async::Thread::FORK_POLICY_STOP
                       else
                         Workers::Async::Thread::FORK_POLICY_RESTART
                       end

  # Update thread fork policy
  super(thread_fork_policy)

  # Update local policy
  @writer_fork_policy = policy
end

#perform(traces) ⇒ Object

NOTE: #perform is wrapped by other modules:

Polling --> Async --> IntervalLoop --> AsyncTraceWriter --> TraceWriter


130
131
132
133
134
# File 'lib/ddtrace/workers/trace_writer.rb', line 130

def perform(traces)
  super(traces).tap do |responses|
    loop_back_off! if responses.find(&:server_error?)
  end
end

#stop(*args) ⇒ Object



136
137
138
139
# File 'lib/ddtrace/workers/trace_writer.rb', line 136

def stop(*args)
  buffer.close if running?
  super
end

#work_pending?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/ddtrace/workers/trace_writer.rb', line 151

def work_pending?
  !buffer.empty?
end

#write(trace) ⇒ Object



190
191
192
193
194
195
196
197
198
199
# File 'lib/ddtrace/workers/trace_writer.rb', line 190

def write(trace)
  # Start worker thread. If the process has forked, it will trigger #after_fork to
  # reconfigure the worker accordingly.
  # NOTE: It's important we do this before queuing or it will drop the current trace,
  #       because #after_fork resets the buffer.
  perform

  # Queue the trace if running asynchronously, otherwise short-circuit and write it directly.
  async? ? enqueue(trace) : write_traces([trace])
end