Class: Datadog::Workers::AsyncTraceWriter
Overview
Writes traces to transport asynchronously, using a thread & buffer.
Constant Summary
collapse
- DEFAULT_BUFFER_MAX_SIZE =
1000
- FORK_POLICY_ASYNC =
:async
- FORK_POLICY_SYNC =
:sync
Constants included
from Polling
Polling::SHUTDOWN_TIMEOUT
Instance Attribute Summary collapse
Attributes included from Queue
#buffer
Attributes inherited from TraceWriter
#transport
#task
Instance Method Summary
collapse
Methods included from Polling
#enabled=, #enabled?, included
Methods included from Queue
included
Methods inherited from TraceWriter
#flush_completed, #flush_traces, #inject_hostname!, #process_traces, #write_traces
Constructor Details
Returns a new instance of AsyncTraceWriter.
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 107
def initialize(options = {})
super
self.enabled = options.fetch(:enabled, true)
@async = true
self.fork_policy = options.fetch(:fork_policy, FORK_POLICY_ASYNC)
self.loop_base_interval = options[:interval] if options.key?(:interval)
self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio)
self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max)
@buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
self.buffer = TraceBuffer.new(@buffer_size)
end
|
Instance Attribute Details
#async=(value) ⇒ Object
104
105
106
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 104
def async=(value)
@async = value
end
|
Instance Method Details
#after_fork ⇒ Object
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 179
def after_fork
self.buffer = TraceBuffer.new(@buffer_size)
@async = false if @writer_fork_policy == FORK_POLICY_SYNC
end
|
#async? ⇒ Boolean
155
156
157
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 155
def async?
@async == true
end
|
#dequeue ⇒ Object
145
146
147
148
149
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 145
def dequeue
[buffer.pop]
end
|
#enqueue(trace) ⇒ Object
141
142
143
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 141
def enqueue(trace)
buffer.push(trace)
end
|
#fork_policy=(policy) ⇒ Object
NOTE: #perform is wrapped by other modules:
Polling --> Async --> IntervalLoop --> AsyncTraceWriter --> TraceWriter
130
131
132
133
134
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 130
def perform(traces)
super(traces).tap do |responses|
loop_back_off! if responses.find(&:server_error?)
end
end
|
#stop(*args) ⇒ Object
136
137
138
139
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 136
def stop(*args)
buffer.close if running?
super
end
|
#work_pending? ⇒ Boolean
151
152
153
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 151
def work_pending?
!buffer.empty?
end
|
#write(trace) ⇒ Object
190
191
192
193
194
195
196
197
198
199
|
# File 'lib/ddtrace/workers/trace_writer.rb', line 190
def write(trace)
perform
async? ? enqueue(trace) : write_traces([trace])
end
|