Class: Datadog::Writer
- Inherits:
-
Object
- Object
- Datadog::Writer
- Defined in:
- lib/ddtrace/writer.rb
Overview
Traces and services writer that periodically sends data to the trace-agent
Constant Summary collapse
- HOSTNAME =
'localhost'.freeze
- PORT =
'8126'.freeze
Instance Attribute Summary collapse
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Writer
constructor
A new instance of Writer.
-
#send_services(services, transport) ⇒ Object
flush services to the trace-agent, handles services only.
-
#send_spans(traces, transport) ⇒ Object
flush spans to the trace-agent, handles spans only.
-
#start ⇒ Object
spawns two different workers for spans and services; they share the same transport which is thread-safe.
-
#stats ⇒ Object
stats returns a dictionary of stats about the writer.
-
#stop ⇒ Object
stops both workers for spans and services.
-
#write(trace, services) ⇒ Object
enqueue the trace for submission to the API.
Constructor Details
#initialize(options = {}) ⇒ Writer
Returns a new instance of Writer.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/ddtrace/writer.rb', line 13 def initialize( = {}) # writer and transport parameters @buff_size = .fetch(:buffer_size, 100) @span_interval = .fetch(:spans_interval, 1) @service_interval = .fetch(:services_interval, 120) # transport and buffers @transport = .fetch(:transport, Datadog::HTTPTransport.new(HOSTNAME, PORT)) @services = {} # handles the thread creation after an eventual fork @mutex_after_fork = Mutex.new @pid = nil @traces_flushed = 0 @services_flushed = 0 # one worker for both services and traces, each have their own queues @worker = nil end |
Instance Attribute Details
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
8 9 10 |
# File 'lib/ddtrace/writer.rb', line 8 def transport @transport end |
Instance Method Details
#send_services(services, transport) ⇒ Object
flush services to the trace-agent, handles services only
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/ddtrace/writer.rb', line 73 def send_services(services, transport) return true if services.empty? code = transport.send(:services, services) if transport.server_error? code # requeue on server error, skip on success or client error @worker.enqueue_service services return false end @services_flushed += 1 true end |
#send_spans(traces, transport) ⇒ Object
flush spans to the trace-agent, handles spans only
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/ddtrace/writer.rb', line 56 def send_spans(traces, transport) return true if traces.empty? code = transport.send(:traces, traces) if transport.server_error? code # requeue on server error, skip on success or client error traces[0..@buff_size].each do |trace| @worker.enqueue_trace trace end return false end @traces_flushed += traces.length() true end |
#start ⇒ Object
spawns two different workers for spans and services; they share the same transport which is thread-safe
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/ddtrace/writer.rb', line 36 def start @trace_handler = ->(items, transport) { send_spans(items, transport) } @service_handler = ->(items, transport) { send_services(items, transport) } @worker = Datadog::Workers::AsyncTransport.new(@span_interval, @service_interval, @transport, @buff_size, @trace_handler, @service_handler) @worker.start() end |
#stats ⇒ Object
stats returns a dictionary of stats about the writer.
110 111 112 113 114 115 116 |
# File 'lib/ddtrace/writer.rb', line 110 def stats { traces_flushed: @traces_flushed, services_flushed: @services_flushed, transport: @transport.stats } end |
#stop ⇒ Object
stops both workers for spans and services.
50 51 52 53 |
# File 'lib/ddtrace/writer.rb', line 50 def stop @worker.stop() @worker = nil end |
#write(trace, services) ⇒ Object
enqueue the trace for submission to the API
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/ddtrace/writer.rb', line 87 def write(trace, services) # In multiprocess environments, the main process initializes the +Writer+ instance and if # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new # processes will share the same +Writer+ until the first write (COW). Because of that, # each process owns a different copy of the +@buffer+ after each write and so the # +AsyncTransport+ will not send data to the trace agent. # # This check ensures that if a process doesn't own the current +Writer+, async workers # will be initialized again (but only once for each process). pid = Process.pid @mutex_after_fork.synchronize do if pid != @pid @pid = pid # we should start threads because the worker doesn't own this start() end end @worker.enqueue_trace(trace) @worker.enqueue_service(services) end |